diff --git a/docs/user-guide/providers/okta/authentication.mdx b/docs/user-guide/providers/okta/authentication.mdx index c9154ee936..ca56a0535b 100644 --- a/docs/user-guide/providers/okta/authentication.mdx +++ b/docs/user-guide/providers/okta/authentication.mdx @@ -35,14 +35,18 @@ The bundled checks require the following read-only scopes: - `okta.policies.read` - `okta.brands.read` - `okta.apps.read` +- `okta.logStreams.read` +- `okta.idps.read` Additional scopes will be needed as more services and checks are added. These are the current ones needed: | Scope | Used by | |---|---| -| `okta.policies.read` | Sign-on, password, and authentication policies | +| `okta.policies.read` | Sign-on, password, authentication, and `USER_LIFECYCLE` (Workflow > Automations) policies | | `okta.brands.read` | Sign-in page customizations (DOD Notice and Consent Banner check) | | `okta.apps.read` | First-party app settings (Okta Admin Console session), integrated app inventory, and the Authentication Policies bound to Okta applications | +| `okta.logStreams.read` | Log Stream configuration (`/api/v1/logStreams`) | +| `okta.idps.read` | Identity Providers, including Smart Card (X509) IdPs (`/api/v1/idps`) | ### Required Admin Role @@ -68,7 +72,9 @@ Okta filters the first-party apps (`saasure`, `okta_enduser`) out of `/api/v1/ap A fifth check — `application_admin_console_session_idle_timeout_15min` (STIG V-273187) — also requires Super Administrator: it calls `GET /api/v1/first-party-app-settings/admin-console`, which returns `403 E0000006` for every role below Super Administrator. -When the service app runs with Read-Only Administrator, the five checks listed in this section return **MANUAL** instead of PASS/FAIL — the rest of the scan keeps running. +`user_inactivity_automation_35d_enabled` (STIG V-273188) reads `USER_LIFECYCLE` policies (`list_policies(type='USER_LIFECYCLE')`) using the `okta.policies.read` scope. The Read-Only Administrator role is enough to list them; no Super Administrator requirement. + +When the service app runs with Read-Only Administrator, the checks listed in this section return **MANUAL** instead of PASS/FAIL — the rest of the scan keeps running. Read-Only Administrator stays the recommended default for the least-privilege framing that aligns with DISA STIG. Assign Super Administrator on a separate run when full coverage of the first-party app checks is needed. @@ -158,8 +164,8 @@ export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem" # or export OKTA_PRIVATE_KEY="$(cat /secure/path/to/prowler-okta.pem)" -# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read" -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read" +# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" uv run python prowler-cli.py okta ``` diff --git a/docs/user-guide/providers/okta/getting-started-okta.mdx b/docs/user-guide/providers/okta/getting-started-okta.mdx index 66d3dd1808..6086afefb3 100644 --- a/docs/user-guide/providers/okta/getting-started-okta.mdx +++ b/docs/user-guide/providers/okta/getting-started-okta.mdx @@ -85,8 +85,8 @@ Follow the [Okta Authentication](/user-guide/providers/okta/authentication) guid export OKTA_ORG_DOMAIN="acme.okta.com" export OKTA_CLIENT_ID="0oa1234567890abcdef" export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem" -# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read" -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read" +# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" ``` The private key file may contain either a PEM-encoded RSA key or a JWK JSON document. @@ -143,10 +143,13 @@ prowler okta --config-file /path/to/config.yaml Prowler for Okta includes security checks across the following services: -| Service | Description | -| --------------- | -------------------------------------------------------------------------------------------------------------------------------------- | -| **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) | -| **Application** | Okta Admin Console sign-on settings plus Authentication Policy controls for Okta applications (session idle, MFA, phishing resistance, network zones) | +| Service | Description | +| --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- | +| **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) | +| **Application** | Okta Admin Console sign-on settings plus Authentication Policy controls for Okta applications (session idle, MFA, phishing resistance, network zones) | +| **User** | User lifecycle automations (inactivity-based deprovisioning) | +| **System Log** | Log Stream configuration that off-loads audit records to a central SIEM | +| **Identity Provider** | Identity Providers, including Smart Card (X509) IdP status and certificate-chain visibility | ## Troubleshooting @@ -158,11 +161,13 @@ This is stricter than simply finding the same timeout value somewhere else in th ### Default Scopes -Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On and Application services: +Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On, Application, User, System Log, and Identity Provider services: - `okta.policies.read` - `okta.brands.read` - `okta.apps.read` +- `okta.logStreams.read` +- `okta.idps.read` The service app must have these scopes granted in the **Okta API Scopes** tab. When the granted set is narrower than the requested set, the token request fails with an `invalid_scope` error and the scan stops at provider initialization. @@ -170,10 +175,10 @@ When additional checks are enabled — or when running against a service app tha ```bash # Environment variable — comma-separated -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.users.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read,okta.users.read" # CLI flag — space-separated -prowler okta --okta-scopes okta.policies.read okta.brands.read okta.apps.read okta.users.read +prowler okta --okta-scopes okta.policies.read okta.brands.read okta.apps.read okta.logStreams.read okta.idps.read okta.users.read ``` For the full catalog of OAuth scopes exposed by the Okta Management API, refer to the [Okta OAuth 2.0 scopes documentation](https://developer.okta.com/docs/api/oauth2/). diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 6922846b0d..45cf485716 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `sagemaker_models_monitor_enabled` check for AWS provider, verifying that each SageMaker monitoring schedule is in the `Scheduled` state so data and model drift is actively detected [(#11278)](https://github.com/prowler-cloud/prowler/pull/11278) - DORA (Digital Operational Resilience Act, Regulation (EU) 2022/2554) universal compliance framework with AWS provider coverage across the five DORA pillars [(#11131)](https://github.com/prowler-cloud/prowler/pull/11131) - `elbv2_alb_drop_invalid_header_fields_enabled` check for AWS provider, verifying Application Load Balancers have `routing.http.drop_invalid_header_fields.enabled` set to `true` to mitigate HTTP desync attacks (AWS FSBP ELB.4) [(#11471)](https://github.com/prowler-cloud/prowler/pull/11471) +- `user`, `systemlog` and `idp` service for Okta provider with `user_inactivity_automation_35d_enabled`, `systemlog_streaming_enabled` and `idp_smart_card_dod_approved_ca` checks [(#11496)](https://github.com/prowler-cloud/prowler/pull/11496) --- diff --git a/prowler/providers/okta/lib/arguments/arguments.py b/prowler/providers/okta/lib/arguments/arguments.py index 287f786b0f..4ed5cfa187 100644 --- a/prowler/providers/okta/lib/arguments/arguments.py +++ b/prowler/providers/okta/lib/arguments/arguments.py @@ -35,7 +35,8 @@ def init_parser(self): nargs="+", help=( "OAuth scopes to request, space-separated " - "(e.g. okta.policies.read okta.brands.read okta.apps.read). " + "(e.g. okta.policies.read okta.brands.read okta.apps.read " + "okta.logStreams.read okta.idps.read). " "Defaults to the read scopes required by the bundled checks." ), default=None, diff --git a/prowler/providers/okta/lib/service/pagination.py b/prowler/providers/okta/lib/service/pagination.py new file mode 100644 index 0000000000..a30bbf9477 --- /dev/null +++ b/prowler/providers/okta/lib/service/pagination.py @@ -0,0 +1,69 @@ +"""Shared pagination helpers for Okta SDK list calls. + +The Okta SDK exposes paginated list endpoints (`list_applications`, +`list_policies`, `list_log_streams`, `list_identity_providers`, …) that +return a tuple `(items, response, error)`. The next page is signalled +through an RFC 5988 `Link: <…>; rel="next"` header carrying an opaque +`after` cursor. + +These helpers are used by every Okta service that needs to drain a +paginated endpoint. They live here so we don't keep copy-pasting them +into each service module. +""" + +from typing import Optional +from urllib.parse import parse_qs, urlparse + + +def next_after_cursor(resp) -> Optional[str]: + """Extract the `after` cursor from a `Link: ...; rel="next"` header. + + Returns None when there is no next page. Header format follows RFC + 5988 and Okta's pagination guide. + """ + if resp is None: + return None + headers = getattr(resp, "headers", None) or {} + link = headers.get("link") or headers.get("Link") or "" + if not link: + return None + for part in link.split(","): + if 'rel="next"' not in part: + continue + url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">") + cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0] + if cursor: + return cursor + return None + + +async def paginate(fetch): + """Drain all pages of an SDK list call. + + `fetch` is a callable that accepts the `after` cursor (or None for + the first page) and returns the SDK's standard `(items, resp, err)` + tuple — or the 2-tuple early-error shape `(items, err)`. Follows the + `Link: rel="next"` header until exhausted. The returned tuple is + `(all_items, error)` — error is non-None only when a page fails + to fetch. + """ + all_items = [] + result = await fetch(None) + err = result[-1] + if err is not None: + return [], err + items = result[0] + resp = result[1] if len(result) >= 3 else None + all_items.extend(items or []) + while True: + cursor = next_after_cursor(resp) + if not cursor: + break + result = await fetch(cursor) + err = result[-1] + if err is not None: + return all_items, err + items = result[0] + resp = result[1] if len(result) >= 3 else None + all_items.extend(items or []) + return all_items, None diff --git a/prowler/providers/okta/lib/service/raw_fetch.py b/prowler/providers/okta/lib/service/raw_fetch.py new file mode 100644 index 0000000000..42e8eede11 --- /dev/null +++ b/prowler/providers/okta/lib/service/raw_fetch.py @@ -0,0 +1,141 @@ +"""Raw-JSON HTTP fetch via the Okta SDK's request executor. + +Some Okta Management API endpoints are not yet exposed as typed methods +on the SDK client (e.g. `/api/v1/automations`), or the typed path's +pydantic deserialization rejects values the API actually returns (e.g. +the `KnowledgeConstraint.types` lowercase issue we hit on +`list_policy_rules`). In both cases we go around the typed layer: +construct the request via `client._request_executor.create_request`, +execute without a response type, and parse the body ourselves. + +`get_json` returns the parsed JSON payload (typically a list or dict) +or raises with a descriptive log line on any of the failure modes — +request build, transport, decode, parse. `get_json_paginated` drains +list endpoints by following the `Link: rel="next"` cursor — without it, +the raw fallback would silently truncate at the per-request `limit`. +Callers are expected to project the JSON onto their own pydantic snapshot. +""" + +import json +from typing import Any, Optional +from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import next_after_cursor + + +async def get_json( + client, + path: str, + *, + accept: str = "application/json", + context: Optional[str] = None, +) -> Optional[Any]: + """GET `path` via the SDK's request executor and return parsed JSON. + + Returns the decoded JSON payload on success, or None when the + request, transport, or decode steps fail. Each failure path emits a + `logger.error` line tagged with `context` so the caller can grep + for it. + """ + label = context or path + request, error = await client._request_executor.create_request( + method="GET", + url=path, + body=None, + headers={"Accept": accept}, + ) + if error is not None: + logger.error(f"Raw fetch (create_request) failed for {label}: {error}") + return None + + _response, response_body, error = await client._request_executor.execute(request) + if error is not None: + logger.error(f"Raw fetch (execute) failed for {label}: {error}") + return None + + if isinstance(response_body, (bytes, bytearray)): + try: + response_body = response_body.decode("utf-8") + except UnicodeDecodeError as decode_err: + logger.error(f"Could not decode response for {label}: {decode_err}") + return None + try: + return json.loads(response_body) if response_body else None + except json.JSONDecodeError as decode_err: + logger.error(f"Could not parse JSON for {label}: {decode_err}") + return None + + +async def get_json_paginated( + client, + path: str, + *, + page_size: int = 200, + accept: str = "application/json", + context: Optional[str] = None, +) -> Optional[list]: + """Drain all pages of a raw-JSON list endpoint. + + Mirrors the typed `pagination.paginate` shape but operates on the + SDK's request executor directly. Follows the `Link: rel="next"` + header until exhausted, accumulating items across pages. Returns + the concatenated list, or None if any page fails to fetch or the + response is not a JSON array. + + `page_size` is appended as `limit=N` to the first request; subsequent + requests use the URL Okta returns via the cursor. + """ + label = context or path + all_items: list = [] + current_path = _set_query(path, {"limit": str(page_size)}) + while True: + request, error = await client._request_executor.create_request( + method="GET", + url=current_path, + body=None, + headers={"Accept": accept}, + ) + if error is not None: + logger.error(f"Raw fetch (create_request) failed for {label}: {error}") + return None + + response, response_body, error = await client._request_executor.execute(request) + if error is not None: + logger.error(f"Raw fetch (execute) failed for {label}: {error}") + return None + + if isinstance(response_body, (bytes, bytearray)): + try: + response_body = response_body.decode("utf-8") + except UnicodeDecodeError as decode_err: + logger.error(f"Could not decode response for {label}: {decode_err}") + return None + if not response_body: + break + try: + page = json.loads(response_body) + except json.JSONDecodeError as decode_err: + logger.error(f"Could not parse JSON for {label}: {decode_err}") + return None + if not isinstance(page, list): + logger.error( + f"Unexpected raw payload shape for {label}: " + f"{type(page).__name__}; expected list" + ) + return None + all_items.extend(page) + + cursor = next_after_cursor(response) + if not cursor: + break + current_path = _set_query(path, {"limit": str(page_size), "after": cursor}) + return all_items + + +def _set_query(path: str, params: dict) -> str: + """Return `path` with the given query params merged in (overriding existing).""" + parsed = urlparse(path) + qs = dict(parse_qsl(parsed.query)) + qs.update({k: v for k, v in params.items() if v is not None}) + return urlunparse(parsed._replace(query=urlencode(qs))) diff --git a/prowler/providers/okta/okta_provider.py b/prowler/providers/okta/okta_provider.py index 0a7b0b8e9d..678e9edbd4 100644 --- a/prowler/providers/okta/okta_provider.py +++ b/prowler/providers/okta/okta_provider.py @@ -32,7 +32,13 @@ from prowler.providers.okta.exceptions.exceptions import ( from prowler.providers.okta.lib.mutelist.mutelist import OktaMutelist from prowler.providers.okta.models import OktaIdentityInfo, OktaSession -DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read", "okta.apps.read"] +DEFAULT_SCOPES = [ + "okta.policies.read", + "okta.brands.read", + "okta.apps.read", + "okta.logStreams.read", + "okta.idps.read", +] # Accept only Okta-managed domains. Custom (vanity) domains are rejected on # purpose — they're a recurring source of typos and silent misconfig and # Prowler's audience overwhelmingly uses Okta-managed hosts. The TLDs below diff --git a/prowler/providers/okta/services/application/application_service.py b/prowler/providers/okta/services/application/application_service.py index de2fe0a658..fa6f483021 100644 --- a/prowler/providers/okta/services/application/application_service.py +++ b/prowler/providers/okta/services/application/application_service.py @@ -1,10 +1,13 @@ -import json from typing import Optional -from urllib.parse import parse_qs, urlparse +from urllib.parse import urlparse from pydantic import BaseModel, ValidationError from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate as _paginate_shared +from prowler.providers.okta.lib.service.raw_fetch import ( + get_json_paginated as _raw_get_json_paginated, +) from prowler.providers.okta.lib.service.service import OktaService # These three keys are Okta-platform constants, not tenant-configurable: @@ -28,29 +31,6 @@ DASHBOARD_APP_NAME = "okta_enduser" ADMIN_CONSOLE_FIRST_PARTY_APP_KEY = "admin-console" -def _next_after_cursor(resp) -> Optional[str]: - """Extract the `after` cursor from a `Link: ...; rel="next"` header. - - Returns None when there is no next page. Header format follows RFC 5988 - and Okta's pagination guide. Mirrors the helper in `signon_service` — - duplicated rather than shared until a third Okta service appears. - """ - if resp is None: - return None - headers = getattr(resp, "headers", None) or {} - link = headers.get("link") or headers.get("Link") or "" - if not link: - return None - for part in link.split(","): - if 'rel="next"' not in part: - continue - url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">") - cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0] - if cursor: - return cursor - return None - - REQUIRED_SCOPES: dict[str, str] = { "admin_console_app_settings": "okta.apps.read", "built_in_apps": "okta.apps.read", @@ -321,69 +301,24 @@ class Application(OktaService): """Raw-JSON fallback for `list_policy_rules`. Bypasses the Okta SDK's typed deserialization by calling the - request executor directly without a response type. The response - body is then `json.loads`-ed and projected onto our own pydantic - snapshot, which only validates the fields the STIG checks - actually read. This keeps the checks evaluable on tenants where - the Management API returns values the SDK validators reject. + request executor directly via the shared `get_json_paginated` + helper, which follows `Link: rel=next` so policies with more + rules than `rule_fetch_limit` are not silently truncated. + Projects the response onto our own pydantic snapshot which only + validates the fields the STIG checks actually read. This keeps + the checks evaluable on tenants where the Management API returns + values the SDK validators reject. """ - request, error = await self.client._request_executor.create_request( - method="GET", - url=f"/api/v1/policies/{policy_id}/rules?limit={rule_fetch_limit}", - body=None, - headers={"Accept": "application/json"}, + rules_data = await _raw_get_json_paginated( + self.client, + f"/api/v1/policies/{policy_id}/rules", + page_size=rule_fetch_limit, + context=f"access policy {policy_id} rules", ) - if error is not None: - logger.error( - f"Raw rules fetch (create_request) failed for {policy_id}: {error}" - ) + if rules_data is None: return AuthenticationPolicy( id=policy_id, name="", status="", is_default=False, rules=[] ) - - _response, response_body, error = await self.client._request_executor.execute( - request - ) - if error is not None: - logger.error(f"Raw rules fetch (execute) failed for {policy_id}: {error}") - return AuthenticationPolicy( - id=policy_id, name="", status="", is_default=False, rules=[] - ) - - if isinstance(response_body, (bytes, bytearray)): - try: - response_body = response_body.decode("utf-8") - except UnicodeDecodeError as decode_err: - logger.error( - f"Could not decode rules response for {policy_id}: {decode_err}" - ) - return AuthenticationPolicy( - id=policy_id, name="", status="", is_default=False, rules=[] - ) - try: - rules_data = json.loads(response_body) if response_body else [] - except json.JSONDecodeError as decode_err: - logger.error(f"Could not parse rules JSON for {policy_id}: {decode_err}") - return AuthenticationPolicy( - id=policy_id, name="", status="", is_default=False, rules=[] - ) - - if not isinstance(rules_data, list): - logger.error( - f"Unexpected raw rules payload shape for {policy_id}: " - f"got {type(rules_data).__name__}, expected list" - ) - return AuthenticationPolicy( - id=policy_id, name="", status="", is_default=False, rules=[] - ) - - if len(rules_data) >= rule_fetch_limit: - logger.warning( - f"Access policy {policy_id} returned {len(rules_data)} rules " - f"via raw-JSON fallback — the per-policy fetch limit " - f"({rule_fetch_limit}) was hit; any rules beyond this limit " - "are not evaluated by Prowler." - ) rules_out = [_raw_rule_to_model(rule) for rule in rules_data] return AuthenticationPolicy( id=policy_id, name="", status="", is_default=False, rules=rules_out @@ -391,33 +326,7 @@ class Application(OktaService): @staticmethod async def _paginate(fetch): - """Drain all pages of an SDK list call. - - `fetch` is a callable taking the `after` cursor (or None) and - returning the SDK's `(items, resp, err)` tuple. Follows the - `Link: rel="next"` header until exhausted. Mirrors the helper in - `signon_service`. - """ - all_items = [] - result = await fetch(None) - err = result[-1] - if err is not None: - return [], err - items = result[0] - resp = result[1] if len(result) >= 3 else None - all_items.extend(items or []) - while True: - cursor = _next_after_cursor(resp) - if not cursor: - break - result = await fetch(cursor) - err = result[-1] - if err is not None: - return all_items, err - items = result[0] - resp = result[1] if len(result) >= 3 else None - all_items.extend(items or []) - return all_items, None + return await _paginate_shared(fetch) def _policy_id_from_href(href: Optional[str]) -> Optional[str]: diff --git a/prowler/providers/okta/services/idp/__init__.py b/prowler/providers/okta/services/idp/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/idp/idp_client.py b/prowler/providers/okta/services/idp/idp_client.py new file mode 100644 index 0000000000..5bbf98c17a --- /dev/null +++ b/prowler/providers/okta/services/idp/idp_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.idp.idp_service import Idp + +idp_client = Idp(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/idp/idp_service.py b/prowler/providers/okta/services/idp/idp_service.py new file mode 100644 index 0000000000..2ff106cb47 --- /dev/null +++ b/prowler/providers/okta/services/idp/idp_service.py @@ -0,0 +1,118 @@ +from typing import Optional + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate +from prowler.providers.okta.lib.service.service import OktaService + +# Okta's API value for the "Smart Card" IdP shown in the Admin Console. +# The UI label is "Smart Card IdP" but the `type` field on the API response +# is `X509` (Mutual TLS) — that is the value we filter on. +SMART_CARD_IDP_TYPE = "X509" + +REQUIRED_SCOPES: dict[str, str] = { + "identity_providers": "okta.idps.read", +} + + +class Idp(OktaService): + """Fetches Okta Identity Providers. + + Populates `self.identity_providers` keyed by IdP id. Each entry + captures the minimum fields the bundled checks read: identity + (`id`, `name`), `type`, `status`, and — for `X509` Smart Card IdPs + — the certificate-chain `issuer` and `kid` exposed by Okta's + `protocol.credentials.trust` structure. Reading the issuer DN lets + the check surface it for out-of-band verification against the + DOD-approved CA list. + + Required OAuth scopes (`REQUIRED_SCOPES`) are compared against the + access token's granted scopes (`provider.identity.granted_scopes`). + Missing scopes are recorded in `self.missing_scope` so the check + can emit an explicit MANUAL finding. + """ + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + granted = set(getattr(provider.identity, "granted_scopes", None) or []) + self.missing_scope: dict[str, Optional[str]] = { + resource: (scope if granted and scope not in granted else None) + for resource, scope in REQUIRED_SCOPES.items() + } + + self.identity_providers: dict[str, OktaIdentityProvider] = ( + {} + if self.missing_scope["identity_providers"] + else self._list_identity_providers() + ) + + def _list_identity_providers(self) -> dict: + logger.info("Idp - Listing Okta Identity Providers...") + try: + return self._run(self._fetch_identity_providers()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_identity_providers(self) -> dict: + result: dict[str, OktaIdentityProvider] = {} + all_idps, err = await paginate( + lambda after: self.client.list_identity_providers(after=after) + ) + if err is not None: + logger.error(f"Error listing identity providers: {err}") + return result + + for idp in all_idps: + idp_id = getattr(idp, "id", "") or "" + if not idp_id: + continue + issuer, kid = _trust_fields(idp) + result[idp_id] = OktaIdentityProvider( + id=idp_id, + name=getattr(idp, "name", "") or "", + type=_stringify_enum(getattr(idp, "type", None)) or "", + status=_stringify_enum(getattr(idp, "status", None)) or "", + trust_issuer=issuer, + trust_kid=kid, + ) + return result + + +def _trust_fields(idp) -> tuple[Optional[str], Optional[str]]: + """Extract `issuer` and `kid` from an `X509` IdP's protocol.credentials.trust. + + The SDK exposes `IdentityProvider.protocol` as `IdentityProviderProtocol`, + a Pydantic v2 oneOf wrapper that holds the concrete protocol (ProtocolMtls + for X509 IdPs) on `actual_instance`. `credentials` is not proxied on the + wrapper, so reading it directly returns None — we have to unwrap first. + """ + protocol = getattr(idp, "protocol", None) + if protocol is None: + return None, None + actual_protocol = getattr(protocol, "actual_instance", None) or protocol + credentials = getattr(actual_protocol, "credentials", None) + if credentials is None: + return None, None + trust = getattr(credentials, "trust", None) + if trust is None: + return None, None + return getattr(trust, "issuer", None), getattr(trust, "kid", None) + + +def _stringify_enum(value) -> Optional[str]: + if value is None: + return None + return getattr(value, "value", None) or str(value) + + +class OktaIdentityProvider(BaseModel): + id: str + name: str = "" + type: str = "" + status: str = "" + trust_issuer: Optional[str] = None + trust_kid: Optional[str] = None diff --git a/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/__init__.py b/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca.metadata.json b/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca.metadata.json new file mode 100644 index 0000000000..e7e85294be --- /dev/null +++ b/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "idp_smart_card_dod_approved_ca", + "CheckTitle": "Okta Smart Card (X509) Identity Provider uses a DOD-approved certificate authority", + "CheckType": [], + "ServiceName": "idp", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Every Okta Smart Card (X509) Identity Provider must be `ACTIVE` and its certificate chain must be issued by a DOD-approved CA. The check ships default issuer-DN patterns covering DOD PKI and ECA, and matches them against the chain's `issuer`. Override or extend via `okta_dod_approved_ca_issuer_patterns` in the audit config to recognise tenant-specific DOD CAs.", + "Risk": "An Okta Smart Card IdP whose certificate chain is not issued by a DOD-approved CA can be used to authenticate non-vetted identities.\n\n- **Trust on an unverified CA** allows impersonation of CAC/PIV holders\n- **Bypass of the federal PKI** required for DOD-grade identity assurance\n- **Acceptance of certificates** from a private or unaccredited issuer", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://help.okta.com/en-us/content/topics/security/idp-enable-smart-card.htm", + "https://developer.okta.com/docs/api/openapi/okta-management/management/tag/IdentityProvider/" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Identity Providers**.\n3. For each IdP whose **Type** is **Smart Card**, click **Actions** > **Configure**.\n4. Under **Certificate chain**, verify the certificate is from a DOD-approved Certificate Authority (DOD PKI, ECA, JITC, or equivalent).\n5. If the IdP is not **Active**, activate it once the chain is validated.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Verify each Okta Smart Card (X509) Identity Provider is ACTIVE and its certificate chain is issued by a DOD-approved Certificate Authority. Document the issuer for audit evidence.", + "Url": "https://hub.prowler.com/check/idp_smart_card_dod_approved_ca" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Aligns with DISA STIG V-273207 / OKTA-APP-001920." +} diff --git a/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca.py b/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca.py new file mode 100644 index 0000000000..5d19cca0dd --- /dev/null +++ b/prowler/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca.py @@ -0,0 +1,148 @@ +import re + +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.idp.idp_client import idp_client +from prowler.providers.okta.services.idp.idp_service import ( + SMART_CARD_IDP_TYPE, + OktaIdentityProvider, +) +from prowler.providers.okta.services.idp.lib.idp_helpers import ( + missing_idps_scope_finding, +) + +# Default issuer-DN substring patterns recognised as DOD-approved Certificate +# Authorities. The DOD PKI publishes canonical DN forms that include +# `O=U.S. Government, OU=DoD` (for DoD Root, DoD ID, DoD EMAIL, DoD SW, DoD +# JITC CAs) and `O=U.S. Government, OU=ECA` for the External Certificate +# Authorities. Customers running an internal CA outside these patterns can +# extend the list via the `okta_dod_approved_ca_issuer_patterns` audit-config +# entry — see the per-check Notes in metadata.json. +DEFAULT_DOD_CA_ISSUER_PATTERNS = ( + # `OU=DoD` is the distinctive DISA DN component for every CA in the DoD + # PKI (Root, ID, EMAIL, SW, JITC). `OU=ECA` is the equivalent for the + # External Certificate Authorities. The trailing `\b` prevents accidental + # matches against superstrings like `OU=DoDExtra`. + r"\bOU=DoD\b", + r"\bOU=ECA\b", +) + + +class idp_smart_card_dod_approved_ca(Check): + """Verifies that Okta Smart Card (X509) IdPs are configured and use a DOD-approved CA. + + PASS when the IdP is `ACTIVE` and its certificate chain's `issuer` + DN matches one of the configured DOD-approved CA patterns. MANUAL + when active but the issuer doesn't match (operator can verify + out-of-band or extend the pattern list). FAIL when no Smart Card + IdP is configured or when the configured IdP is inactive. + """ + + def execute(self) -> list[CheckReportOkta]: + findings: list[CheckReportOkta] = [] + org_domain = idp_client.provider.identity.org_domain + audit_config = idp_client.audit_config or {} + configured_patterns = audit_config.get("okta_dod_approved_ca_issuer_patterns") + patterns = ( + tuple(configured_patterns) + if configured_patterns + else DEFAULT_DOD_CA_ISSUER_PATTERNS + ) + + missing_scope = idp_client.missing_scope.get("identity_providers") + if missing_scope: + findings.append( + missing_idps_scope_finding(self.metadata(), org_domain, missing_scope) + ) + return findings + + smart_card_idps = [ + idp + for idp in idp_client.identity_providers.values() + if (idp.type or "").upper() == SMART_CARD_IDP_TYPE + ] + + if not smart_card_idps: + placeholder = OktaIdentityProvider( + id="okta-smart-card-idp-missing", + name="(no Smart Card IdP configured)", + type=SMART_CARD_IDP_TYPE, + status="MISSING", + ) + report = CheckReportOkta( + metadata=self.metadata(), resource=placeholder, org_domain=org_domain + ) + report.status = "FAIL" + report.status_extended = ( + "No Smart Card (X509) Identity Providers are configured. " + "Configure a Smart Card IdP in the Admin Console " + "(Security > Identity Providers) with a certificate chain " + "issued by a DOD-approved CA. If CAC/PIV authentication is " + "not required for this tenant, mutelist this check with " + "that documented exception." + ) + findings.append(report) + return findings + + for idp in smart_card_idps: + report = CheckReportOkta( + metadata=self.metadata(), resource=idp, org_domain=org_domain + ) + label = f"Okta Smart Card IdP '{idp.name}' (id={idp.id}, type={idp.type})" + chain_detail = _format_chain_detail(idp) + + if (idp.status or "").upper() != "ACTIVE": + report.status = "FAIL" + report.status_extended = ( + f"{label} is not ACTIVE (status={idp.status or 'unset'}). " + "Activate the IdP from Security > Identity Providers, then " + f"verify the certificate chain. {chain_detail}" + ) + findings.append(report) + continue + + matched_pattern = _matched_issuer_pattern(idp.trust_issuer, patterns) + if matched_pattern is not None: + report.status = "PASS" + report.status_extended = ( + f"{label} is ACTIVE and its chain issuer matches a " + f"DOD-approved CA pattern (`{matched_pattern}`). " + f"{chain_detail}" + ) + else: + report.status = "MANUAL" + report.status_extended = ( + f"{label} is ACTIVE but its chain issuer does not match any " + "configured DOD-approved CA pattern. Verify out-of-band " + "that the certificate chain belongs to a DOD-approved " + "Certificate Authority, or extend " + "`okta_dod_approved_ca_issuer_patterns` in the audit " + f"config. {chain_detail}" + ) + findings.append(report) + return findings + + +def _matched_issuer_pattern(issuer, patterns): + if not issuer: + return None + for pattern in patterns: + try: + if re.search(pattern, issuer): + return pattern + except re.error: + # Skip malformed operator-supplied patterns rather than crashing + # the whole check. + continue + return None + + +def _format_chain_detail(idp: OktaIdentityProvider) -> str: + if idp.trust_issuer or idp.trust_kid: + return ( + f"Chain issuer: {idp.trust_issuer or 'unset'}; " + f"kid: {idp.trust_kid or 'unset'}." + ) + return ( + "Chain issuer and kid were not exposed by the API; inspect the IdP in " + "the Admin Console under Security > Identity Providers > Configure." + ) diff --git a/prowler/providers/okta/services/idp/lib/__init__.py b/prowler/providers/okta/services/idp/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/idp/lib/idp_helpers.py b/prowler/providers/okta/services/idp/lib/idp_helpers.py new file mode 100644 index 0000000000..f1689f34a1 --- /dev/null +++ b/prowler/providers/okta/services/idp/lib/idp_helpers.py @@ -0,0 +1,26 @@ +"""Shared helpers for the OKTA idp STIG checks.""" + +from prowler.lib.check.models import CheckReportOkta +from prowler.providers.okta.services.idp.idp_service import OktaIdentityProvider + + +def missing_idps_scope_finding( + metadata, org_domain: str, scope: str +) -> CheckReportOkta: + """Build the MANUAL finding when the IdPs scope is not granted.""" + placeholder = OktaIdentityProvider( + id="okta-idps-scope-missing", + name="(scope not granted)", + status="MISSING", + ) + report = CheckReportOkta( + metadata=metadata, resource=placeholder, org_domain=org_domain + ) + report.status = "MANUAL" + report.status_extended = ( + "Could not retrieve Okta Identity Providers: the Okta service app is " + f"missing the required `{scope}` API scope. Grant it on the service " + "app's Okta API Scopes tab in the Okta Admin Console, then re-run the " + "check." + ) + return report diff --git a/prowler/providers/okta/services/signon/signon_service.py b/prowler/providers/okta/services/signon/signon_service.py index 7c32363501..663e9cf187 100644 --- a/prowler/providers/okta/services/signon/signon_service.py +++ b/prowler/providers/okta/services/signon/signon_service.py @@ -1,34 +1,11 @@ from typing import Optional -from urllib.parse import parse_qs, urlparse from pydantic import BaseModel from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate as _paginate_shared from prowler.providers.okta.lib.service.service import OktaService - -def _next_after_cursor(resp) -> Optional[str]: - """Extract the `after` cursor from a `Link: ...; rel="next"` header. - - Returns None when there is no next page. Header format follows RFC 5988 - and Okta's pagination guide. - """ - if resp is None: - return None - headers = getattr(resp, "headers", None) or {} - link = headers.get("link") or headers.get("Link") or "" - if not link: - return None - for part in link.split(","): - if 'rel="next"' not in part: - continue - url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">") - cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0] - if cursor: - return cursor - return None - - REQUIRED_SCOPES: dict[str, str] = { "global_session_policies": "okta.policies.read", "sign_in_pages": "okta.brands.read", @@ -228,33 +205,7 @@ class Signon(OktaService): @staticmethod async def _paginate(fetch): - """Drain all pages of an SDK list call. - - `fetch` is a callable that takes the `after` cursor (or None for - the first page) and returns the SDK's standard `(items, resp, err)` - tuple. We follow `Link: rel="next"` headers until exhausted. - """ - all_items = [] - result = await fetch(None) - # Defensive against the SDK's 2-tuple early-error path: error is last. - err = result[-1] - if err is not None: - return [], err - items = result[0] - resp = result[1] if len(result) >= 3 else None - all_items.extend(items or []) - while True: - cursor = _next_after_cursor(resp) - if not cursor: - break - result = await fetch(cursor) - err = result[-1] - if err is not None: - return all_items, err - items = result[0] - resp = result[1] if len(result) >= 3 else None - all_items.extend(items or []) - return all_items, None + return await _paginate_shared(fetch) class GlobalSessionPolicyRule(BaseModel): diff --git a/prowler/providers/okta/services/systemlog/__init__.py b/prowler/providers/okta/services/systemlog/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/systemlog/lib/__init__.py b/prowler/providers/okta/services/systemlog/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/systemlog/lib/systemlog_helpers.py b/prowler/providers/okta/services/systemlog/lib/systemlog_helpers.py new file mode 100644 index 0000000000..e82cedff5f --- /dev/null +++ b/prowler/providers/okta/services/systemlog/lib/systemlog_helpers.py @@ -0,0 +1,26 @@ +"""Shared helpers for the OKTA systemlog STIG checks.""" + +from prowler.lib.check.models import CheckReportOkta +from prowler.providers.okta.services.systemlog.systemlog_service import LogStream + + +def missing_log_streams_scope_finding( + metadata, org_domain: str, scope: str +) -> CheckReportOkta: + """Build the MANUAL finding when the log-streams scope is not granted.""" + placeholder = LogStream( + id="okta-log-streams-scope-missing", + name="(scope not granted)", + status="MISSING", + type="", + ) + report = CheckReportOkta( + metadata=metadata, resource=placeholder, org_domain=org_domain + ) + report.status = "MANUAL" + report.status_extended = ( + "Could not retrieve Okta Log Streams: the Okta service app is missing " + f"the required `{scope}` API scope. Grant it on the service app's " + "Okta API Scopes tab in the Okta Admin Console, then re-run the check." + ) + return report diff --git a/prowler/providers/okta/services/systemlog/systemlog_client.py b/prowler/providers/okta/services/systemlog/systemlog_client.py new file mode 100644 index 0000000000..3dc0a1a0af --- /dev/null +++ b/prowler/providers/okta/services/systemlog/systemlog_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.systemlog.systemlog_service import SystemLog + +systemlog_client = SystemLog(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/systemlog/systemlog_service.py b/prowler/providers/okta/services/systemlog/systemlog_service.py new file mode 100644 index 0000000000..96c8b6d7ae --- /dev/null +++ b/prowler/providers/okta/services/systemlog/systemlog_service.py @@ -0,0 +1,136 @@ +from typing import Optional + +from pydantic import BaseModel, ValidationError + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate +from prowler.providers.okta.lib.service.raw_fetch import ( + get_json_paginated as raw_get_json_paginated, +) +from prowler.providers.okta.lib.service.service import OktaService + +REQUIRED_SCOPES: dict[str, str] = { + "log_streams": "okta.logStreams.read", +} + + +class SystemLog(OktaService): + """Fetches Okta Log Stream configurations. + + Populates `self.log_streams` keyed by Log Stream id. Each entry + carries `name`, `status`, `type` — enough for the streaming-enabled + check to evaluate whether the tenant has off-loaded audit records + to an external SIEM/event bus. + + Required OAuth scopes (`REQUIRED_SCOPES`) are compared against the + access token's granted scopes (`provider.identity.granted_scopes`). + Missing scopes are recorded in `self.missing_scope` so the check + can emit an explicit MANUAL finding instead of a misleading + "no resources returned". + """ + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + granted = set(getattr(provider.identity, "granted_scopes", None) or []) + self.missing_scope: dict[str, Optional[str]] = { + resource: (scope if granted and scope not in granted else None) + for resource, scope in REQUIRED_SCOPES.items() + } + + self.log_streams: dict[str, LogStream] = ( + {} if self.missing_scope["log_streams"] else self._list_log_streams() + ) + + def _list_log_streams(self) -> dict: + logger.info("SystemLog - Listing Okta Log Streams...") + try: + return self._run(self._fetch_log_streams()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_log_streams(self) -> dict: + result: dict[str, LogStream] = {} + try: + all_streams, err = await paginate( + lambda after: self.client.list_log_streams(after=after) + ) + except ValidationError as ve: + # Upstream okta-sdk-python bug: e.g. `LogStreamSettingsAws`'s + # `eventSourceName` validator regex is `^[a-zA-Z0-9.\-_]$` — + # missing the `+` quantifier, so it rejects every + # multi-character name. Fall back to raw JSON so the check + # can still evaluate the tenant's actual log-stream state. + # Remove this workaround once okta-sdk-python fixes the + # validator (issue to be filed upstream). + logger.warning( + f"Okta SDK raised ValidationError parsing log streams " + f"({ve.error_count()} error(s)) — falling back to raw-JSON " + "parse. This is an okta-sdk-python deserialization bug." + ) + return await self._fetch_log_streams_raw() + + if err is not None: + logger.error(f"Error listing log streams: {err}") + return result + + for stream in all_streams: + stream_id = getattr(stream, "id", "") or "" + if not stream_id: + continue + result[stream_id] = LogStream( + id=stream_id, + name=getattr(stream, "name", "") or "", + status=getattr(stream, "status", "") or "", + type=_stringify_enum(getattr(stream, "type", None)) or "", + ) + return result + + async def _fetch_log_streams_raw(self) -> dict: + """Raw-JSON fallback for `list_log_streams`. + + Bypasses the SDK's typed deserialization via the shared + `get_json_paginated` helper (which follows the `Link: rel=next` + cursor so tenants with >200 streams are not silently truncated), + and projects the response onto our own pydantic snapshot which + only validates the four fields the check reads. Keeps the check + evaluable on tenants whose Log Stream settings happen to trip + an SDK enum/regex validator. + """ + result: dict[str, LogStream] = {} + data = await raw_get_json_paginated( + self.client, + "/api/v1/logStreams", + page_size=200, + context="log streams", + ) + if data is None: + return result + for item in data: + if not isinstance(item, dict): + continue + stream_id = item.get("id") + if not stream_id: + continue + result[stream_id] = LogStream( + id=stream_id, + name=item.get("name") or "", + status=(item.get("status") or "").upper(), + type=item.get("type") or "", + ) + return result + + +def _stringify_enum(value) -> Optional[str]: + if value is None: + return None + return getattr(value, "value", None) or str(value) + + +class LogStream(BaseModel): + id: str + name: str = "" + status: str = "" + type: str = "" diff --git a/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/__init__.py b/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled.metadata.json b/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled.metadata.json new file mode 100644 index 0000000000..27541ee2fe --- /dev/null +++ b/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "systemlog_streaming_enabled", + "CheckTitle": "Okta off-loads audit records to a central log server via Log Streaming", + "CheckType": [], + "ServiceName": "systemlog", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "NotDefined", + "ResourceGroup": "monitoring", + "Description": "Okta must off-load audit records to a central log server. At least one **Log Stream** (AWS EventBridge, Splunk Cloud, etc.) must be configured and `ACTIVE` in the tenant. Alternatively, an external SIEM pulling the System Log API can satisfy the requirement, but that pull-based path is verified manually.", + "Risk": "Audit records stored only inside the Okta tenant are exposed to accidental or incidental deletion or alteration.\n\n- **No central retention** of authentication events for incident investigations\n- **Single point of failure** for the audit trail\n- **No correlation** with other identity, network, and endpoint telemetry in the SIEM", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://help.okta.com/en-us/content/topics/reports/log-streaming/about-log-streams.htm", + "https://developer.okta.com/docs/api/openapi/okta-management/management/tag/LogStream/" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Reports** > **Log Streaming**.\n3. Click **Add Log Stream** and select **AWS EventBridge**, **Splunk Cloud**, or another supported destination.\n4. Complete the connection fields and save.\n5. Activate the stream and verify the destination receives events.\n6. If the destination SIEM is not natively supported, document the pull-based ingestion that uses the System Log API.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Configure at least one ACTIVE Okta Log Stream that off-loads audit records to a central SIEM (AWS EventBridge, Splunk Cloud, or another supported destination). Document any alternative pull-based ingestion via the System Log API.", + "Url": "https://hub.prowler.com/check/systemlog_streaming_enabled" + } + }, + "Categories": [ + "logging" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Aligns with DISA STIG V-273202 / OKTA-APP-001430." +} diff --git a/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled.py b/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled.py new file mode 100644 index 0000000000..61448aeaf5 --- /dev/null +++ b/prowler/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled.py @@ -0,0 +1,88 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.systemlog.lib.systemlog_helpers import ( + missing_log_streams_scope_finding, +) +from prowler.providers.okta.services.systemlog.systemlog_client import systemlog_client +from prowler.providers.okta.services.systemlog.systemlog_service import LogStream + + +class systemlog_streaming_enabled(Check): + """Verifies that at least one Okta Log Stream is configured and active. + + Off-loading audit records to a central SIEM (AWS EventBridge, Splunk + Cloud, etc.) is the standard mechanism for centralised retention. + An alternative path — pulling the System Log API into an external + SIEM — is allowed by the requirement, but cannot be verified + automatically; this check emits a MANUAL note in that case. + """ + + def execute(self) -> list[CheckReportOkta]: + findings: list[CheckReportOkta] = [] + org_domain = systemlog_client.provider.identity.org_domain + + missing_scope = systemlog_client.missing_scope.get("log_streams") + if missing_scope: + findings.append( + missing_log_streams_scope_finding( + self.metadata(), org_domain, missing_scope + ) + ) + return findings + + active_streams = [ + stream + for stream in systemlog_client.log_streams.values() + if not stream.status or stream.status.upper() == "ACTIVE" + ] + + if not systemlog_client.log_streams: + placeholder = LogStream( + id="okta-log-streams-missing", + name="(no Log Streams configured)", + status="MISSING", + type="", + ) + report = CheckReportOkta( + metadata=self.metadata(), resource=placeholder, org_domain=org_domain + ) + report.status = "FAIL" + report.status_extended = ( + "No Okta Log Streams are configured. Configure a Log Stream " + "(Reports > Log Streaming) to off-load audit records to a " + "central SIEM. If an external SIEM is already pulling logs " + "via the System Log API, mutelist this check with that " + "evidence." + ) + findings.append(report) + return findings + + if not active_streams: + placeholder = LogStream( + id="okta-log-streams-inactive", + name="(no active Log Streams)", + status="INACTIVE", + type="", + ) + report = CheckReportOkta( + metadata=self.metadata(), resource=placeholder, org_domain=org_domain + ) + report.status = "FAIL" + report.status_extended = ( + f"{len(systemlog_client.log_streams)} Okta Log Stream(s) are " + "configured but none are ACTIVE. Activate a Log Stream to " + "off-load audit records to a central SIEM." + ) + findings.append(report) + return findings + + for stream in active_streams: + report = CheckReportOkta( + metadata=self.metadata(), resource=stream, org_domain=org_domain + ) + report.status = "PASS" + report.status_extended = ( + f"Okta Log Stream '{stream.name}' (type={stream.type or 'unset'}) " + "is ACTIVE and off-loads audit records to a central SIEM." + ) + findings.append(report) + return findings diff --git a/prowler/providers/okta/services/user/__init__.py b/prowler/providers/okta/services/user/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/user/lib/__init__.py b/prowler/providers/okta/services/user/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/user/lib/user_helpers.py b/prowler/providers/okta/services/user/lib/user_helpers.py new file mode 100644 index 0000000000..423801f2d8 --- /dev/null +++ b/prowler/providers/okta/services/user/lib/user_helpers.py @@ -0,0 +1,26 @@ +"""Shared helpers for the OKTA user STIG checks.""" + +from prowler.lib.check.models import CheckReportOkta +from prowler.providers.okta.services.user.user_service import UserAutomation + + +def missing_user_scope_finding( + metadata, org_domain: str, scope: str +) -> CheckReportOkta: + """Build the MANUAL finding when an OAuth scope is not granted.""" + placeholder = UserAutomation( + id="okta-user-scope-missing", + name="(scope not granted)", + status="MISSING", + ) + report = CheckReportOkta( + metadata=metadata, resource=placeholder, org_domain=org_domain + ) + report.status = "MANUAL" + report.status_extended = ( + f"Could not retrieve Okta user lifecycle automations: the Okta service " + f"app is missing the required `{scope}` API scope. Grant it on the " + "service app's Okta API Scopes tab in the Okta Admin Console, then " + "re-run the check." + ) + return report diff --git a/prowler/providers/okta/services/user/user_client.py b/prowler/providers/okta/services/user/user_client.py new file mode 100644 index 0000000000..9a49fbdbac --- /dev/null +++ b/prowler/providers/okta/services/user/user_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.user.user_service import User + +user_client = User(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/__init__.py b/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled.metadata.json b/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled.metadata.json new file mode 100644 index 0000000000..64f24d95ae --- /dev/null +++ b/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled.metadata.json @@ -0,0 +1,36 @@ +{ + "Provider": "okta", + "CheckID": "user_inactivity_automation_35d_enabled", + "CheckTitle": "Okta automation suspends or deactivates users after 35 days of inactivity", + "CheckType": [], + "ServiceName": "user", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "An Okta **Workflows Automation** must disable inactive user accounts. The automation must be `ACTIVE`, on an `ACTIVE` schedule, evaluate `User Inactivity = 35 days` (or less), apply to a group covering every user, and trigger `Suspended` / `Deactivated` / `Deprovisioned`. Threshold override: `okta_user_inactivity_max_days`. N/A when user sourcing is delegated to Active Directory or LDAP.", + "Risk": "Inactive Okta accounts retained indefinitely give an attacker who exploits one undetected access to downstream applications.\n\n- **Account takeover via dormant identities** that no one is monitoring\n- **Lateral movement** through SSO sessions of forgotten users\n- **Stale entitlements** that survive role and policy reorganisations", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://help.okta.com/en-us/content/topics/automation-hooks/automations-main.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Workflow** > **Automations** and click **Add Automation**.\n3. Name the automation (e.g., `User Inactivity`).\n4. Add a condition: select **User Inactivity in Okta** and enter `35` days.\n5. Configure the schedule to run daily and activate it.\n6. Apply the automation to a group that covers every user — typically `Everyone`.\n7. Add an action: **Change User lifecycle state in Okta** and choose `Suspended` (or `Deactivated`/`Deprovisioned`).\n8. Activate the automation.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Create an active Okta Workflows automation that runs daily, evaluates `User Inactivity in Okta = 35 days`, applies to a group covering every user, and changes the user lifecycle state to Suspended/Deactivated. If user sourcing is delegated to Active Directory or LDAP, document that the connected directory enforces this requirement instead.", + "Url": "https://hub.prowler.com/check/user_inactivity_automation_35d_enabled" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Aligns with DISA STIG V-273188 / OKTA-APP-000090." +} diff --git a/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled.py b/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled.py new file mode 100644 index 0000000000..79e77c3f73 --- /dev/null +++ b/prowler/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled.py @@ -0,0 +1,204 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.user.lib.user_helpers import ( + missing_user_scope_finding, +) +from prowler.providers.okta.services.user.user_client import user_client +from prowler.providers.okta.services.user.user_service import UserAutomation + +DEFAULT_INACTIVITY_DAYS = 35 +SUSPENSION_LIFECYCLE_ACTIONS = {"SUSPENDED", "DEACTIVATED", "DEPROVISIONED"} + + +class user_inactivity_automation_35d_enabled(Check): + """Verifies that Okta suspends/deactivates users after 35 days of inactivity. + + A Workflows Automation must exist with: + - status ACTIVE, + - schedule active, + - condition `User Inactivity in Okta = 35 days`, + - action that changes the user state to Suspended / Deactivated, + - applied to a group covering every user (typically `Everyone`). + + When user sourcing is delegated to an external directory (Active + Directory or LDAP), the requirement is N/A on the Okta side — the + connected directory is expected to enforce inactivity-based + deactivation instead. Threshold override: + `okta_user_inactivity_max_days` in the audit config. + """ + + def execute(self) -> list[CheckReportOkta]: + findings: list[CheckReportOkta] = [] + audit_config = user_client.audit_config or {} + threshold_days = audit_config.get( + "okta_user_inactivity_max_days", DEFAULT_INACTIVITY_DAYS + ) + org_domain = user_client.provider.identity.org_domain + + for scope_key in ("automations", "identity_providers"): + missing_scope = user_client.missing_scope.get(scope_key) + if missing_scope: + findings.append( + missing_user_scope_finding( + self.metadata(), org_domain, missing_scope + ) + ) + return findings + + # External-directory N/A path. + if user_client.external_directory_idps: + idp_names = ", ".join( + f"'{idp.name}' (type={idp.type})" + for idp in user_client.external_directory_idps.values() + ) + placeholder = UserAutomation( + id="okta-user-inactivity-na-external-directory", + name="(external directory enforces inactivity)", + status="N/A", + ) + report = CheckReportOkta( + metadata=self.metadata(), + resource=placeholder, + org_domain=org_domain, + ) + report.status = "MANUAL" + report.status_extended = ( + "User sourcing is delegated to an external directory " + f"({idp_names}). The 35-day inactivity disable requirement is " + "expected to be enforced by the connected directory rather " + "than by an Okta automation. Confirm out-of-band that the " + "external directory disables accounts after " + f"{threshold_days} days of inactivity." + ) + findings.append(report) + return findings + + compliant_automations = [ + automation + for automation in user_client.automations.values() + if _is_compliant(automation, threshold_days) + ] + + if not user_client.automations: + placeholder = UserAutomation( + id="okta-user-inactivity-no-automations", + name="(no automations configured)", + status="MISSING", + ) + report = CheckReportOkta( + metadata=self.metadata(), + resource=placeholder, + org_domain=org_domain, + ) + report.status = "FAIL" + report.status_extended = ( + "No Okta Workflows automations are configured. Create an " + "automation that suspends or deactivates users after " + f"{threshold_days} days of inactivity, scoped to a group " + "covering every user (typically 'Everyone'), with an active " + "schedule." + ) + findings.append(report) + return findings + + if compliant_automations: + for automation in compliant_automations: + report = CheckReportOkta( + metadata=self.metadata(), + resource=automation, + org_domain=org_domain, + ) + report.status = "PASS" + groups_label = ", ".join(automation.applies_to_groups) + report.status_extended = ( + f"Okta automation '{automation.name}' is ACTIVE with an " + f"active schedule, triggers after " + f"{automation.inactivity_days} days of inactivity, and " + f"changes the user state to " + f"{automation.lifecycle_action or 'unset'}. " + f"Applied to group(s): {groups_label}. Verify that these " + "group(s) cover every user. Okta has no built-in " + "'Everyone' group ID, so tenant-wide coverage cannot be " + "asserted automatically." + ) + findings.append(report) + return findings + + # Automations exist but none satisfy the predicate — surface the + # closest candidate for the auditor. + candidate = _closest_candidate(user_client.automations.values()) + report = CheckReportOkta( + metadata=self.metadata(), + resource=candidate + or UserAutomation( + id="okta-user-inactivity-noncompliant", + name="(no compliant automation)", + status="MISSING", + ), + org_domain=org_domain, + ) + report.status = "FAIL" + report.status_extended = _failure_message(candidate, threshold_days) + findings.append(report) + return findings + + +def _is_compliant(automation: UserAutomation, threshold_days: int) -> bool: + # `applies_to_groups` must be non-empty — Okta USER_LIFECYCLE policies + # do not implicitly cover every user; the scope is whatever group IDs + # the operator put in `people.groups.include`. An empty scope means + # the automation runs against nobody. Operator must still verify those + # group(s) cover the intended user population (surfaced in the PASS + # status_extended). + return bool( + automation.status.upper() == "ACTIVE" + and automation.schedule_status.upper() == "ACTIVE" + and automation.inactivity_days is not None + and automation.inactivity_days <= threshold_days + and (automation.lifecycle_action or "").upper() in SUSPENSION_LIFECYCLE_ACTIONS + and bool(automation.applies_to_groups) + ) + + +def _closest_candidate(automations): + automations = list(automations) + if not automations: + return None + automations.sort( + key=lambda a: ( + 0 if a.status.upper() == "ACTIVE" else 1, + 0 if a.schedule_status.upper() == "ACTIVE" else 1, + ( + abs(a.inactivity_days - DEFAULT_INACTIVITY_DAYS) + if a.inactivity_days is not None + else 10_000 + ), + a.name, + ) + ) + return automations[0] + + +def _failure_message(automation, threshold_days): + if automation is None: + return f"No Okta automation enforces {threshold_days}-day inactivity disable." + issues = [] + if automation.status.upper() != "ACTIVE": + issues.append(f"status {automation.status or 'unset'}") + if automation.schedule_status.upper() != "ACTIVE": + issues.append(f"schedule {automation.schedule_status or 'unset'}") + if automation.inactivity_days is None: + issues.append("no inactivity condition") + elif automation.inactivity_days > threshold_days: + issues.append( + f"inactivity {automation.inactivity_days}d (max {threshold_days}d)" + ) + action = (automation.lifecycle_action or "").upper() + if action not in SUSPENSION_LIFECYCLE_ACTIONS: + issues.append(f"action {automation.lifecycle_action or 'unset'}") + if not automation.applies_to_groups: + issues.append("no group scope") + detail = ", ".join(issues) if issues else "incomplete" + return ( + f"Okta automation '{automation.name}' fails {threshold_days}d " + f"inactivity: {detail}." + ) diff --git a/prowler/providers/okta/services/user/user_service.py b/prowler/providers/okta/services/user/user_service.py new file mode 100644 index 0000000000..c816bd38ee --- /dev/null +++ b/prowler/providers/okta/services/user/user_service.py @@ -0,0 +1,455 @@ +from typing import Optional + +from pydantic import BaseModel, ValidationError + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate +from prowler.providers.okta.lib.service.raw_fetch import ( + get_json_paginated as raw_get_json_paginated, +) +from prowler.providers.okta.lib.service.service import OktaService + +# External-directory IdP `type` values that delegate user sourcing to a +# separate identity store. When any of these is present and ACTIVE, the +# STIG's 35-day inactivity disable requirement is N/A on the Okta side — +# the connected directory is expected to enforce it instead. +EXTERNAL_DIRECTORY_IDP_TYPES = {"ACTIVE_DIRECTORY", "LDAP"} + +# Okta exposes "Workflow > Automations" as USER_LIFECYCLE policies with +# inactivity rule conditions, not as a standalone `/api/v1/automations` +# resource. The SDK's `UserPolicyRuleCondition.inactivity` and +# `ScheduledUserLifecycleAction` models confirm this; the API rejects +# every other `type` candidate. +USER_LIFECYCLE_POLICY_TYPE = "USER_LIFECYCLE" + +REQUIRED_SCOPES: dict[str, str] = { + "automations": "okta.policies.read", + "identity_providers": "okta.idps.read", +} + + +class User(OktaService): + """Fetches Okta User Lifecycle Automations and external-directory IdPs. + + Populates: + - `self.automations` — keyed by USER_LIFECYCLE policy rule id. Each + entry projects the fields the 35-day inactivity check evaluates: + identity (`id`, `name` — taken from the rule), `status`, + `schedule_status` (inherited from the parent policy), the + `inactivity_days` condition and `applies_to_groups` scope from the + parent policy, and the `lifecycle_action` from the rule. + - `self.external_directory_idps` — keyed by IdP id. Used to short + circuit the STIG to N/A when user sourcing is delegated to an + external directory (Active Directory, LDAP). + + The Okta Admin Console's "Workflow > Automations" page is rendered + on top of `USER_LIFECYCLE` policies in the Management API + (`list_policies(type='USER_LIFECYCLE')` + `list_policy_rules(...)`). + There is no standalone `/api/v1/automations` GET endpoint; the SDK's + `InactivityPolicyRuleCondition`, `UserPolicyRuleCondition`, and + `ScheduledUserLifecycleAction` models all hang off the policy API. + + Required OAuth scopes (`REQUIRED_SCOPES`) are compared against the + access token's granted scopes (`provider.identity.granted_scopes`). + Missing scopes are recorded in `self.missing_scope` so the check + can emit an explicit MANUAL finding. + """ + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + granted = set(getattr(provider.identity, "granted_scopes", None) or []) + self.missing_scope: dict[str, Optional[str]] = { + resource: (scope if granted and scope not in granted else None) + for resource, scope in REQUIRED_SCOPES.items() + } + + self.automations: dict[str, UserAutomation] = ( + {} if self.missing_scope["automations"] else self._list_automations() + ) + self.external_directory_idps: dict[str, ExternalDirectoryIdp] = ( + {} + if self.missing_scope["identity_providers"] + else self._list_external_directory_idps() + ) + + def _list_automations(self) -> dict: + logger.info("User - Listing USER_LIFECYCLE policies and rules...") + try: + return self._run(self._fetch_automations()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_automations(self) -> dict: + result: dict[str, UserAutomation] = {} + try: + all_policies, err = await paginate( + lambda after: self.client.list_policies( + type=USER_LIFECYCLE_POLICY_TYPE, after=after + ) + ) + except (ValueError, ValidationError) as ex: + # Upstream okta-sdk-python bug: `Policy.from_dict` uses a + # discriminator dispatch that maps `type` → concrete Policy + # subclass, and `USER_LIFECYCLE` is not in the map. The SDK + # raises ValueError ("failed to lookup discriminator value") + # even though the API returns a valid policy. Fall back to + # raw JSON. Remove once okta-sdk-python adds + # USER_LIFECYCLE → UserLifecyclePolicy to the mapping. + logger.warning( + f"Okta SDK raised {type(ex).__name__} parsing USER_LIFECYCLE " + "policies — falling back to raw-JSON parse. This is an " + "okta-sdk-python deserialization bug " + "(missing discriminator mapping)." + ) + return await self._fetch_automations_raw() + + if err is not None: + logger.error(f"Error listing USER_LIFECYCLE policies: {err}") + return result + + for policy in all_policies: + policy_id = getattr(policy, "id", "") or "" + if not policy_id: + continue + policy_status = _stringify_enum(getattr(policy, "status", None)) or "" + policy_name = getattr(policy, "name", "") or "" + rules = await self._fetch_rules(policy_id) + if rules is None: + # Rule typed parsing tripped an SDK validator. Re-run the + # whole automation discovery via raw JSON so we don't lose + # the rule data for this — or any other — policy. Cheaper + # than mixing typed and raw projections. + logger.warning( + f"Rule typed parsing failed for USER_LIFECYCLE policy " + f"{policy_id} — re-running all automations via raw-JSON." + ) + return await self._fetch_automations_raw() + if not rules: + # A policy with no rules exists in the Admin Console UI as + # an "Automation" the operator hasn't finished configuring + # (no conditions, no actions). Emit a placeholder so the + # check FAILs with a specific message naming every missing + # piece, instead of pretending the policy doesn't exist. + result[policy_id] = _shell_automation( + policy_id, policy_name, policy_status + ) + continue + for rule in rules: + automation = _rule_to_automation(rule, policy) + if automation is None: + continue + result[automation.id] = automation + return result + + async def _fetch_rules(self, policy_id: str) -> Optional[list]: + """Return the policy's typed rules, or None to signal raw fallback. + + The Okta SDK's `list_policy_rules` shares the same brittle typed + deserialization as `list_policies` (strict pydantic validators + rejecting values the API actually returns). When that happens the + caller can't reuse any of the typed projection for this policy — + we return None as a sentinel and the caller re-runs the whole + discovery via `_fetch_automations_raw`. Returning `[]` would + otherwise misclassify the policy as an "unfinished automation" + and FAIL it. + """ + rule_fetch_limit = 100 + try: + result = await self.client.list_policy_rules( + policy_id, limit=str(rule_fetch_limit) + ) + except (ValueError, ValidationError) as ex: + logger.warning( + f"Okta SDK raised {type(ex).__name__} parsing rules for " + f"USER_LIFECYCLE policy {policy_id} — signaling raw fallback." + ) + return None + err = result[-1] + if err is not None: + logger.error( + f"Error listing rules for USER_LIFECYCLE policy {policy_id}: {err}" + ) + return [] + rules = list(result[0] or []) + if len(rules) >= rule_fetch_limit: + logger.warning( + f"USER_LIFECYCLE policy {policy_id} returned {len(rules)} rules — " + f"the per-policy fetch limit ({rule_fetch_limit}) was hit; any " + "rules beyond this limit are not evaluated." + ) + return rules + + async def _fetch_automations_raw(self) -> dict: + """Raw-JSON fallback for `list_policies(type='USER_LIFECYCLE')`. + + Bypasses the SDK's typed deserialization via the shared + `get_json_paginated` helper, then drains each policy's rules + via the same path. Projects everything onto our `UserAutomation` + snapshot which only validates the fields the check reads. + """ + result: dict[str, UserAutomation] = {} + policies_data = await raw_get_json_paginated( + self.client, + f"/api/v1/policies?type={USER_LIFECYCLE_POLICY_TYPE}", + page_size=200, + context="USER_LIFECYCLE policies", + ) + if policies_data is None: + return result + + for policy_dict in policies_data: + if not isinstance(policy_dict, dict): + continue + policy_id = policy_dict.get("id") + if not policy_id: + continue + policy_status = (policy_dict.get("status") or "").upper() + policy_name = policy_dict.get("name") or "" + + rules_data = await raw_get_json_paginated( + self.client, + f"/api/v1/policies/{policy_id}/rules", + page_size=100, + context=f"USER_LIFECYCLE policy {policy_id} rules", + ) + if not rules_data: + # No rules under the policy → emit placeholder. Same + # rationale as the typed path: surface the unfinished + # automation so the check can name what's missing. + result[policy_id] = _shell_automation( + policy_id, policy_name, policy_status + ) + continue + for rule_dict in rules_data: + automation = _raw_rule_to_automation( + rule_dict, policy_dict, policy_id, policy_name, policy_status + ) + if automation is None: + continue + result[automation.id] = automation + return result + + def _list_external_directory_idps(self) -> dict: + logger.info("User - Listing Okta IdPs for external-directory detection...") + try: + return self._run(self._fetch_external_directory_idps()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_external_directory_idps(self) -> dict: + result: dict[str, ExternalDirectoryIdp] = {} + all_idps, err = await paginate( + lambda after: self.client.list_identity_providers(after=after) + ) + if err is not None: + logger.error(f"Error listing identity providers: {err}") + return result + + for idp in all_idps: + idp_type = _stringify_enum(getattr(idp, "type", None)) or "" + if idp_type.upper() not in EXTERNAL_DIRECTORY_IDP_TYPES: + continue + idp_status = _stringify_enum(getattr(idp, "status", None)) or "" + if idp_status.upper() != "ACTIVE": + continue + idp_id = getattr(idp, "id", "") or "" + if not idp_id: + continue + result[idp_id] = ExternalDirectoryIdp( + id=idp_id, + name=getattr(idp, "name", "") or "", + type=idp_type, + status=idp_status, + ) + return result + + +def _rule_to_automation(rule, policy) -> Optional["UserAutomation"]: + """Project a typed USER_LIFECYCLE policy + rule pair onto our snapshot. + + Important: in the actual API response, an Okta "Automation" is split + across two resources — the **inactivity condition + group scope** + live on the *policy* (`policy.conditions.people.users.inactivity`, + `policy.conditions.people.groups.include`), and the **lifecycle + action** lives on the *rule* (`rule.actions.user_lifecycle.action` + on the typed model; `updateUserLifecycle.targetStatus` on raw JSON). + The rule's own `conditions` is typically empty. Projecting requires + both — kept aligned with `_raw_rule_to_automation` so the two paths + yield identical snapshots. + """ + rule_id = getattr(rule, "id", "") or "" + if not rule_id: + return None + + policy_id = getattr(policy, "id", "") or "" + policy_name = getattr(policy, "name", "") or "" + policy_status = (_stringify_enum(getattr(policy, "status", None)) or "").upper() + + # Inactivity + groups live on the POLICY in the API response. + inactivity_days: Optional[int] = None + applies_to_groups: list[str] = [] + conditions = getattr(policy, "conditions", None) + people = getattr(conditions, "people", None) if conditions else None + users = getattr(people, "users", None) if people else None + inactivity = getattr(users, "inactivity", None) if users else None + if inactivity is not None: + number = getattr(inactivity, "number", None) + unit = (_stringify_enum(getattr(inactivity, "unit", None)) or "").upper() + if isinstance(number, int) and unit in {"DAYS", "DAY"}: + inactivity_days = number + groups = getattr(people, "groups", None) if people else None + include_groups = getattr(groups, "include", None) if groups else None + if include_groups: + applies_to_groups = [str(g) for g in include_groups if g] + + # Lifecycle action lives on the RULE. + actions = getattr(rule, "actions", None) + user_lifecycle = ( + getattr(actions, "user_lifecycle", None) if actions else None + ) or (getattr(actions, "userLifecycle", None) if actions else None) + lifecycle_action: Optional[str] = None + if user_lifecycle is not None: + for attr in ("action", "status"): + value = _stringify_enum(getattr(user_lifecycle, attr, None)) + if value: + lifecycle_action = value.upper() + break + + rule_name = getattr(rule, "name", "") or policy_name or "(unnamed)" + rule_status = _stringify_enum(getattr(rule, "status", None)) or "" + + return UserAutomation( + id=rule_id, + name=rule_name, + status=rule_status.upper(), + schedule_status=policy_status, + inactivity_days=inactivity_days, + lifecycle_action=lifecycle_action, + applies_to_groups=applies_to_groups, + policy_id=policy_id, + policy_name=policy_name, + ) + + +def _raw_rule_to_automation( + rule_dict, + policy_dict, + policy_id: str, + policy_name: str, + policy_status: str, +) -> Optional["UserAutomation"]: + """Project a raw USER_LIFECYCLE policy+rule pair onto our snapshot. + + Important: in the actual API response, an Okta "Automation" is split + across two resources — the **inactivity condition + group scope** + live on the *policy* (`policy.conditions.people.users.inactivity`, + `policy.conditions.people.groups.include`), and the **lifecycle + action** lives on the *rule* + (`rule.actions.updateUserLifecycle.targetStatus`). The rule's own + `conditions` is typically empty `{}`. Projecting requires both. + + Schedule isn't exposed by the API on either resource. Okta runs an + automation on its UI-configured schedule iff the policy is ACTIVE, + so we treat `policy.status` as the schedule proxy. + """ + if not isinstance(rule_dict, dict): + return None + rule_id = rule_dict.get("id") + if not rule_id: + return None + + # Inactivity + groups live on the POLICY in the API response. + inactivity_days: Optional[int] = None + applies_to_groups: list[str] = [] + if isinstance(policy_dict, dict): + policy_conditions = policy_dict.get("conditions") or {} + people = policy_conditions.get("people") or {} + users = people.get("users") or {} + inactivity = users.get("inactivity") + if isinstance(inactivity, dict): + number = inactivity.get("number") + unit = (inactivity.get("unit") or "").upper() + if isinstance(number, int) and unit in {"DAYS", "DAY"}: + inactivity_days = number + groups = people.get("groups") or {} + include_groups = groups.get("include") + if isinstance(include_groups, list): + applies_to_groups = [str(g) for g in include_groups if g] + + # Lifecycle action lives on the RULE under + # `actions.updateUserLifecycle.targetStatus` (the API uses + # "updateUserLifecycle" rather than the SDK's `user_lifecycle`). + rule_actions = rule_dict.get("actions") or {} + update_user_lifecycle = rule_actions.get("updateUserLifecycle") or {} + lifecycle_action: Optional[str] = None + if isinstance(update_user_lifecycle, dict): + target = update_user_lifecycle.get("targetStatus") + if isinstance(target, str) and target: + lifecycle_action = target.upper() + + return UserAutomation( + id=rule_id, + name=(rule_dict.get("name") or policy_name or "(unnamed)"), + status=(rule_dict.get("status") or "").upper(), + schedule_status=policy_status, + inactivity_days=inactivity_days, + lifecycle_action=lifecycle_action, + applies_to_groups=applies_to_groups, + policy_id=policy_id, + policy_name=policy_name, + ) + + +def _shell_automation( + policy_id: str, policy_name: str, policy_status: str +) -> "UserAutomation": + """Placeholder UserAutomation for a USER_LIFECYCLE policy with no rules. + + Surfaces the unfinished automation in `self.automations` so the check + can list every missing piece in its FAIL message (no inactivity + condition, no lifecycle action, status inactive, etc.) instead of + silently dropping the policy. + """ + upper_status = (policy_status or "").upper() + return UserAutomation( + id=policy_id, + name=policy_name or "(unnamed automation)", + status=upper_status, + schedule_status=upper_status, + inactivity_days=None, + lifecycle_action=None, + applies_to_groups=[], + policy_id=policy_id, + policy_name=policy_name, + ) + + +def _stringify_enum(value) -> Optional[str]: + if value is None: + return None + return getattr(value, "value", None) or str(value) + + +class UserAutomation(BaseModel): + id: str + name: str = "" + status: str = "" + schedule_status: str = "" + inactivity_days: Optional[int] = None + lifecycle_action: Optional[str] = None + applies_to_groups: list[str] = [] + policy_id: str = "" + policy_name: str = "" + + +class ExternalDirectoryIdp(BaseModel): + id: str + name: str = "" + type: str = "" + status: str = "" diff --git a/tests/providers/okta/lib/service/raw_fetch_test.py b/tests/providers/okta/lib/service/raw_fetch_test.py new file mode 100644 index 0000000000..173e78888a --- /dev/null +++ b/tests/providers/okta/lib/service/raw_fetch_test.py @@ -0,0 +1,152 @@ +"""Tests for the raw-JSON HTTP helpers in +`prowler.providers.okta.lib.service.raw_fetch`. + +Covers `get_json` (single-shot) and `get_json_paginated` +(drains list endpoints via the `Link: rel="next"` cursor). +""" + +import asyncio +import json +from unittest import mock + +from prowler.providers.okta.lib.service.raw_fetch import ( + get_json, + get_json_paginated, +) + + +def _run(coro): + return asyncio.run(coro) + + +def _mock_response(headers: dict = None): + r = mock.MagicMock() + r.headers = headers or {} + return r + + +class Test_get_json: + def test_returns_parsed_json_on_success(self): + client = mock.MagicMock() + + async def create(*_a, **_k): + return ({"url": "/x"}, None) + + async def execute(_req): + return (_mock_response(), json.dumps({"hello": "world"}), None) + + client._request_executor.create_request = create + client._request_executor.execute = execute + + assert _run(get_json(client, "/x")) == {"hello": "world"} + + def test_returns_none_on_create_request_error(self): + client = mock.MagicMock() + + async def create(*_a, **_k): + return (None, Exception("boom")) + + client._request_executor.create_request = create + assert _run(get_json(client, "/x")) is None + + def test_returns_none_on_execute_error(self): + client = mock.MagicMock() + + async def create(*_a, **_k): + return ({"url": "/x"}, None) + + async def execute(_req): + return (_mock_response(), None, Exception("boom")) + + client._request_executor.create_request = create + client._request_executor.execute = execute + assert _run(get_json(client, "/x")) is None + + +class Test_get_json_paginated: + def test_drains_all_pages_following_link_rel_next(self): + # Two pages: first carries `Link: <…?after=cur1>; rel="next"`, + # second has no `next`, so iteration stops. + client = mock.MagicMock() + + page1 = [{"id": "a"}, {"id": "b"}] + page2 = [{"id": "c"}] + page1_headers = { + "link": '; rel="next"' + } + + seen_urls = [] + + async def create(**kwargs): + seen_urls.append(kwargs["url"]) + return ({"url": kwargs["url"]}, None) + + async def execute(request): + if "after=cur1" in request["url"]: + return (_mock_response({}), json.dumps(page2), None) + return (_mock_response(page1_headers), json.dumps(page1), None) + + client._request_executor.create_request = create + client._request_executor.execute = execute + + items = _run(get_json_paginated(client, "/api/v1/items", page_size=2)) + + assert items == [{"id": "a"}, {"id": "b"}, {"id": "c"}] + assert len(seen_urls) == 2 + assert "limit=2" in seen_urls[0] + # The cursor was carried into the second request. + assert "after=cur1" in seen_urls[1] + assert "limit=2" in seen_urls[1] + + def test_single_page_terminates_immediately(self): + client = mock.MagicMock() + + async def create(**kwargs): + return ({"url": kwargs["url"]}, None) + + async def execute(_req): + return (_mock_response({}), json.dumps([{"id": "only"}]), None) + + client._request_executor.create_request = create + client._request_executor.execute = execute + + assert _run(get_json_paginated(client, "/api/v1/items")) == [{"id": "only"}] + + def test_returns_none_when_response_is_not_a_list(self): + client = mock.MagicMock() + + async def create(**kwargs): + return ({"url": kwargs["url"]}, None) + + async def execute(_req): + return (_mock_response({}), json.dumps({"error": "nope"}), None) + + client._request_executor.create_request = create + client._request_executor.execute = execute + + assert _run(get_json_paginated(client, "/api/v1/items")) is None + + def test_preserves_existing_query_string_and_overrides_limit(self): + # Caller already passes `type=USER_LIFECYCLE` — pagination must + # merge `limit` without clobbering existing params. + client = mock.MagicMock() + seen = [] + + async def create(**kwargs): + seen.append(kwargs["url"]) + return ({"url": kwargs["url"]}, None) + + async def execute(_req): + return (_mock_response({}), "[]", None) + + client._request_executor.create_request = create + client._request_executor.execute = execute + + _run( + get_json_paginated( + client, "/api/v1/policies?type=USER_LIFECYCLE", page_size=50 + ) + ) + + assert "type=USER_LIFECYCLE" in seen[0] + assert "limit=50" in seen[0] diff --git a/tests/providers/okta/okta_fixtures.py b/tests/providers/okta/okta_fixtures.py index 23d770cf88..83c8812495 100644 --- a/tests/providers/okta/okta_fixtures.py +++ b/tests/providers/okta/okta_fixtures.py @@ -16,7 +16,13 @@ def set_mocked_okta_provider( session = OktaSession( org_domain=OKTA_ORG_DOMAIN, client_id=OKTA_CLIENT_ID, - scopes=["okta.policies.read", "okta.brands.read", "okta.apps.read"], + scopes=[ + "okta.policies.read", + "okta.brands.read", + "okta.apps.read", + "okta.logStreams.read", + "okta.idps.read", + ], private_key=OKTA_PRIVATE_KEY, ) if identity is None: @@ -27,6 +33,8 @@ def set_mocked_okta_provider( "okta.policies.read", "okta.brands.read", "okta.apps.read", + "okta.logStreams.read", + "okta.idps.read", ], ) diff --git a/tests/providers/okta/services/idp/idp_fixtures.py b/tests/providers/okta/services/idp/idp_fixtures.py new file mode 100644 index 0000000000..8ebc449717 --- /dev/null +++ b/tests/providers/okta/services/idp/idp_fixtures.py @@ -0,0 +1,44 @@ +"""Shared helpers for `idp` service check tests.""" + +from unittest import mock + +from prowler.providers.okta.services.idp.idp_service import OktaIdentityProvider +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_idp_client( + identity_providers: dict = None, + missing_scope: dict = None, +): + client = mock.MagicMock() + client.identity_providers = identity_providers or {} + client.provider = set_mocked_okta_provider() + client.audit_config = {} + client.missing_scope = missing_scope or {"identity_providers": None} + return client + + +def smart_card_idp( + idp_id: str = "0oa-x509", + name: str = "CAC IdP", + status: str = "ACTIVE", + issuer: str = "CN=DOD ROOT CA 6", + kid: str = "kid-abc-123", +): + return OktaIdentityProvider( + id=idp_id, + name=name, + type="X509", + status=status, + trust_issuer=issuer, + trust_kid=kid, + ) + + +def non_smart_card_idp( + idp_id: str = "0oa-saml", + name: str = "Corporate SAML", + type: str = "SAML2", + status: str = "ACTIVE", +): + return OktaIdentityProvider(id=idp_id, name=name, type=type, status=status) diff --git a/tests/providers/okta/services/idp/idp_service_test.py b/tests/providers/okta/services/idp/idp_service_test.py new file mode 100644 index 0000000000..3c30c0d3eb --- /dev/null +++ b/tests/providers/okta/services/idp/idp_service_test.py @@ -0,0 +1,80 @@ +import json +from unittest import mock + +from okta.models.identity_provider_protocol import IdentityProviderProtocol + +from prowler.providers.okta.services.idp.idp_service import Idp, OktaIdentityProvider +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + r = mock.MagicMock() + r.headers = headers or {} + return r + + +def _fake_idp(idp_id, name, type_, status="ACTIVE", issuer=None, kid=None): + # Build a real `IdentityProviderProtocol` when issuer/kid are provided + # so the test exercises the SDK's Pydantic v2 oneOf wrapper — credentials + # live on `actual_instance`, not directly on the wrapper. MagicMock + # auto-attribute-creation would otherwise hide a missed unwrap. + idp = mock.MagicMock() + idp.id = idp_id + idp.name = name + idp.type = type_ + idp.status = status + if issuer is None and kid is None: + idp.protocol = None + else: + idp.protocol = IdentityProviderProtocol.from_json( + json.dumps( + { + "type": "MTLS", + "credentials": {"trust": {"issuer": issuer, "kid": kid}}, + } + ) + ) + return idp + + +def _patch_sdk(**methods): + return mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=mock.MagicMock(**methods), + ) + + +class Test_Idp_service: + def test_fetches_idps_with_trust_fields(self): + provider = set_mocked_okta_provider() + x509 = _fake_idp( + "0oa1", + "CAC", + "X509", + issuer="CN=DOD ROOT CA 6", + kid="kid-1", + ) + saml = _fake_idp("0oa2", "Corp", "SAML2") + + async def fake_list(*_a, **_k): + return ([x509, saml], _resp({}), None) + + with _patch_sdk(list_identity_providers=fake_list): + service = Idp(provider) + + assert set(service.identity_providers.keys()) == {"0oa1", "0oa2"} + assert isinstance(service.identity_providers["0oa1"], OktaIdentityProvider) + assert service.identity_providers["0oa1"].trust_issuer == "CN=DOD ROOT CA 6" + assert service.identity_providers["0oa1"].trust_kid == "kid-1" + assert service.identity_providers["0oa2"].trust_issuer is None + + def test_returns_empty_on_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(*_a, **_k): + return ([], _resp({}), Exception("API failure")) + + with _patch_sdk(list_identity_providers=failing): + service = Idp(provider) + + assert service.identity_providers == {} diff --git a/tests/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca_test.py b/tests/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca_test.py new file mode 100644 index 0000000000..f6517e14a9 --- /dev/null +++ b/tests/providers/okta/services/idp/idp_smart_card_dod_approved_ca/idp_smart_card_dod_approved_ca_test.py @@ -0,0 +1,125 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.idp.idp_fixtures import ( + build_idp_client, + non_smart_card_idp, + smart_card_idp, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.idp." + "idp_smart_card_dod_approved_ca.idp_smart_card_dod_approved_ca.idp_client" +) + +DOD_PKI_ISSUER = "CN=DoD ID CA-59, OU=PKI, OU=DoD, O=U.S. Government, C=US" +ECA_ISSUER = "CN=ECA Root CA 4, OU=ECA, O=U.S. Government, C=US" +NON_DOD_ISSUER = "CN=ACME Internal Root, O=Acme Corp, C=US" + + +def _run_check(client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=client), + ): + from prowler.providers.okta.services.idp.idp_smart_card_dod_approved_ca.idp_smart_card_dod_approved_ca import ( + idp_smart_card_dod_approved_ca, + ) + + return idp_smart_card_dod_approved_ca().execute() + + +class Test_idp_smart_card_dod_approved_ca: + def test_pass_when_active_idp_chain_matches_dod_pki_pattern(self): + idp = smart_card_idp(name="CAC", issuer=DOD_PKI_ISSUER, kid="kid-x") + client = build_idp_client(identity_providers={idp.id: idp}) + findings = _run_check(client) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert "OU=DoD" in findings[0].status_extended + assert DOD_PKI_ISSUER in findings[0].status_extended + + def test_pass_when_active_idp_chain_matches_eca_pattern(self): + idp = smart_card_idp(name="ECA Partner", issuer=ECA_ISSUER, kid="kid-e") + client = build_idp_client(identity_providers={idp.id: idp}) + findings = _run_check(client) + assert findings[0].status == "PASS" + assert "OU=ECA" in findings[0].status_extended + + def test_manual_when_active_but_issuer_does_not_match_any_pattern(self): + idp = smart_card_idp(name="Custom", issuer=NON_DOD_ISSUER, kid="kid-c") + client = build_idp_client(identity_providers={idp.id: idp}) + findings = _run_check(client) + assert findings[0].status == "MANUAL" + assert NON_DOD_ISSUER in findings[0].status_extended + assert "okta_dod_approved_ca_issuer_patterns" in findings[0].status_extended + + def test_pass_when_audit_config_pattern_matches(self): + idp = smart_card_idp(name="Custom DOD", issuer=NON_DOD_ISSUER, kid="kid-c") + client = build_idp_client(identity_providers={idp.id: idp}) + client.audit_config = { + "okta_dod_approved_ca_issuer_patterns": [r"CN=ACME Internal Root"] + } + findings = _run_check(client) + assert findings[0].status == "PASS" + + def test_audit_config_overrides_bundled_defaults(self): + # When the operator supplies a list, the bundled DEFAULT patterns + # are replaced (not merged) so customers can carve out a strict set. + idp = smart_card_idp(name="DoD", issuer=DOD_PKI_ISSUER, kid="kid-x") + client = build_idp_client(identity_providers={idp.id: idp}) + client.audit_config = { + "okta_dod_approved_ca_issuer_patterns": [r"CN=YourTenantCustomDodCA"] + } + findings = _run_check(client) + assert findings[0].status == "MANUAL" + + def test_malformed_audit_config_pattern_skipped(self): + # An invalid regex from the operator must not crash the whole check. + idp = smart_card_idp(name="CAC", issuer=DOD_PKI_ISSUER, kid="kid-x") + client = build_idp_client(identity_providers={idp.id: idp}) + client.audit_config = { + "okta_dod_approved_ca_issuer_patterns": [r"[invalid(regex", r"OU=DoD"] + } + findings = _run_check(client) + assert findings[0].status == "PASS" + + def test_fail_when_x509_idp_is_inactive(self): + idp = smart_card_idp(status="INACTIVE", issuer=DOD_PKI_ISSUER) + client = build_idp_client(identity_providers={idp.id: idp}) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "INACTIVE" in findings[0].status_extended + + def test_fail_when_no_smart_card_idp_configured(self): + client = build_idp_client(identity_providers={"saml": non_smart_card_idp()}) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert ( + "No Smart Card (X509) Identity Providers are configured" + in findings[0].status_extended + ) + assert "mutelist" in findings[0].status_extended + + def test_manual_when_idps_scope_missing(self): + client = build_idp_client( + missing_scope={"identity_providers": "okta.idps.read"} + ) + findings = _run_check(client) + assert findings[0].status == "MANUAL" + assert "okta.idps.read" in findings[0].status_extended + + def test_multiple_x509_idps_yield_one_finding_each(self): + idp_a = smart_card_idp(idp_id="0oa-a", name="A", issuer=DOD_PKI_ISSUER) + idp_b = smart_card_idp( + idp_id="0oa-b", name="B", status="INACTIVE", issuer=DOD_PKI_ISSUER + ) + client = build_idp_client(identity_providers={idp_a.id: idp_a, idp_b.id: idp_b}) + findings = _run_check(client) + assert len(findings) == 2 + # We don't strictly assert ordering — just that both are covered. + statuses = sorted(f.status for f in findings) + assert statuses == ["FAIL", "PASS"] diff --git a/tests/providers/okta/services/signon/signon_service_test.py b/tests/providers/okta/services/signon/signon_service_test.py index c408bc1221..0caeb0e680 100644 --- a/tests/providers/okta/services/signon/signon_service_test.py +++ b/tests/providers/okta/services/signon/signon_service_test.py @@ -1,12 +1,14 @@ from unittest import mock +from prowler.providers.okta.lib.service.pagination import ( + next_after_cursor as _next_after_cursor, +) from prowler.providers.okta.models import OktaIdentityInfo from prowler.providers.okta.services.signon.signon_service import ( GlobalSessionPolicy, GlobalSessionPolicyRule, SignInPage, Signon, - _next_after_cursor, ) from tests.providers.okta.okta_fixtures import ( OKTA_CLIENT_ID, diff --git a/tests/providers/okta/services/systemlog/systemlog_fixtures.py b/tests/providers/okta/services/systemlog/systemlog_fixtures.py new file mode 100644 index 0000000000..efc8289f43 --- /dev/null +++ b/tests/providers/okta/services/systemlog/systemlog_fixtures.py @@ -0,0 +1,27 @@ +"""Shared helpers for `systemlog` service check tests.""" + +from unittest import mock + +from prowler.providers.okta.services.systemlog.systemlog_service import LogStream +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_systemlog_client( + log_streams: dict = None, + missing_scope: dict = None, +): + client = mock.MagicMock() + client.log_streams = log_streams or {} + client.provider = set_mocked_okta_provider() + client.audit_config = {} + client.missing_scope = missing_scope or {"log_streams": None} + return client + + +def log_stream( + stream_id: str = "log-1", + name: str = "EventBridge stream", + status: str = "ACTIVE", + type: str = "AWS_EVENTBRIDGE", +): + return LogStream(id=stream_id, name=name, status=status, type=type) diff --git a/tests/providers/okta/services/systemlog/systemlog_service_test.py b/tests/providers/okta/services/systemlog/systemlog_service_test.py new file mode 100644 index 0000000000..63dee95dcc --- /dev/null +++ b/tests/providers/okta/services/systemlog/systemlog_service_test.py @@ -0,0 +1,185 @@ +import json +from unittest import mock + +from prowler.providers.okta.models import OktaIdentityInfo +from prowler.providers.okta.services.systemlog.systemlog_service import ( + LogStream, + SystemLog, +) +from tests.providers.okta.okta_fixtures import ( + OKTA_CLIENT_ID, + OKTA_ORG_DOMAIN, + set_mocked_okta_provider, +) + + +def _resp(headers: dict = None): + r = mock.MagicMock() + r.headers = headers or {} + return r + + +def _fake_stream( + stream_id: str, name: str, status: str = "ACTIVE", type_: str = "AWS_EVENTBRIDGE" +): + s = mock.MagicMock() + s.id = stream_id + s.name = name + s.status = status + s.type = type_ + return s + + +def _patch_sdk(**methods): + return mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=mock.MagicMock(**methods), + ) + + +class Test_SystemLog_service: + def test_fetches_active_streams(self): + provider = set_mocked_okta_provider() + s1 = _fake_stream("log-1", "EventBridge") + s2 = _fake_stream("log-2", "Splunk", type_="SPLUNK_CLOUD_LOGSTREAMING") + + async def fake_list(*_a, **_k): + return ([s1, s2], _resp({}), None) + + with _patch_sdk(list_log_streams=fake_list): + service = SystemLog(provider) + + assert set(service.log_streams.keys()) == {"log-1", "log-2"} + assert isinstance(service.log_streams["log-1"], LogStream) + assert service.log_streams["log-2"].type == "SPLUNK_CLOUD_LOGSTREAMING" + + def test_returns_empty_on_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(*_a, **_k): + return ([], _resp({}), Exception("E0000007")) + + with _patch_sdk(list_log_streams=failing): + service = SystemLog(provider) + + assert service.log_streams == {} + + def test_skips_fetch_when_scope_missing(self): + identity = OktaIdentityInfo( + org_domain=OKTA_ORG_DOMAIN, + client_id=OKTA_CLIENT_ID, + granted_scopes=["okta.policies.read"], # no logStreams scope + ) + provider = set_mocked_okta_provider(identity=identity) + + called = False + + async def fake_list(*_a, **_k): + nonlocal called + called = True + return ([], _resp({}), None) + + with _patch_sdk(list_log_streams=fake_list): + service = SystemLog(provider) + + assert called is False + assert service.log_streams == {} + assert service.missing_scope["log_streams"] == "okta.logStreams.read" + + +class Test_SystemLog_service_sdk_validation_fallback: + """Verifies the raw-JSON fallback when the Okta SDK rejects API values. + + The SDK's `LogStreamSettingsAws.eventSourceName` validator uses the + regex `^[a-zA-Z0-9.\\-_]$` — missing the `+` quantifier, so every + multi-character name raises pydantic `ValidationError`. Without the + fallback the whole stream list is lost; with it, the raw JSON path + still surfaces each stream's id/name/status/type. + """ + + def test_raw_fallback_projects_streams_when_sdk_raises(self): + from pydantic import ValidationError + + provider = set_mocked_okta_provider() + + raw_payload = [ + { + "id": "log-1", + "name": "EventBridge prod", + "status": "ACTIVE", + "type": "AWS_EVENTBRIDGE", + }, + { + "id": "log-2", + "name": "Splunk staging", + "status": "INACTIVE", + "type": "SPLUNK_CLOUD_LOGSTREAMING", + }, + ] + + async def failing_list_log_streams(*_a, **_k): + try: + # Trigger a real pydantic ValidationError so we exercise + # the exact exception type the SDK raises in production. + from okta.models.log_stream_settings_aws import LogStreamSettingsAws + + LogStreamSettingsAws( + accountId="123456789012", + eventSourceName="MultiCharacter", + region="us-east-1", + ) + except ValidationError as ve: + raise ve + return ([], _resp({}), None) + + async def fake_raw_create(*_a, **_k): + return ({"url": "/api/v1/logStreams"}, None) + + async def fake_raw_execute(_request): + return (None, json.dumps(raw_payload), None) + + sdk = mock.MagicMock() + sdk.list_log_streams = failing_list_log_streams + sdk._request_executor.create_request = fake_raw_create + sdk._request_executor.execute = fake_raw_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = SystemLog(provider) + + assert set(service.log_streams.keys()) == {"log-1", "log-2"} + assert service.log_streams["log-1"].status == "ACTIVE" + assert service.log_streams["log-2"].status == "INACTIVE" + assert service.log_streams["log-2"].type == "SPLUNK_CLOUD_LOGSTREAMING" + + def test_raw_fallback_handles_empty_list(self): + from pydantic import ValidationError + + provider = set_mocked_okta_provider() + + async def failing(*_a, **_k): + raise ValidationError.from_exception_data( + title="LogStreamSettingsAws", + line_errors=[], + ) + + async def fake_create(*_a, **_k): + return ({"url": "/api/v1/logStreams"}, None) + + async def fake_execute(_req): + return (None, "[]", None) + + sdk = mock.MagicMock() + sdk.list_log_streams = failing + sdk._request_executor.create_request = fake_create + sdk._request_executor.execute = fake_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = SystemLog(provider) + + assert service.log_streams == {} diff --git a/tests/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled_test.py b/tests/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled_test.py new file mode 100644 index 0000000000..64d2bcb8fc --- /dev/null +++ b/tests/providers/okta/services/systemlog/systemlog_streaming_enabled/systemlog_streaming_enabled_test.py @@ -0,0 +1,73 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.systemlog.systemlog_fixtures import ( + build_systemlog_client, + log_stream, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.systemlog." + "systemlog_streaming_enabled.systemlog_streaming_enabled.systemlog_client" +) + + +def _run_check(client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=client), + ): + from prowler.providers.okta.services.systemlog.systemlog_streaming_enabled.systemlog_streaming_enabled import ( + systemlog_streaming_enabled, + ) + + return systemlog_streaming_enabled().execute() + + +class Test_systemlog_streaming_enabled: + def test_pass_when_active_stream_exists(self): + client = build_systemlog_client( + log_streams={"log-1": log_stream(name="EventBridge prod")} + ) + findings = _run_check(client) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert "EventBridge prod" in findings[0].status_extended + + def test_pass_when_multiple_active_streams(self): + client = build_systemlog_client( + log_streams={ + "log-1": log_stream(stream_id="log-1", name="A"), + "log-2": log_stream(stream_id="log-2", name="B"), + } + ) + findings = _run_check(client) + assert len(findings) == 2 + assert all(f.status == "PASS" for f in findings) + + def test_fail_when_all_streams_inactive(self): + client = build_systemlog_client( + log_streams={"log-1": log_stream(name="A", status="INACTIVE")} + ) + findings = _run_check(client) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "none are ACTIVE" in findings[0].status_extended + + def test_fail_when_no_streams_configured(self): + client = build_systemlog_client(log_streams={}) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "No Okta Log Streams are configured" in findings[0].status_extended + assert "mutelist" in findings[0].status_extended + + def test_manual_when_scope_missing(self): + client = build_systemlog_client( + missing_scope={"log_streams": "okta.logStreams.read"} + ) + findings = _run_check(client) + assert findings[0].status == "MANUAL" + assert "okta.logStreams.read" in findings[0].status_extended diff --git a/tests/providers/okta/services/user/user_fixtures.py b/tests/providers/okta/services/user/user_fixtures.py new file mode 100644 index 0000000000..99282c1efa --- /dev/null +++ b/tests/providers/okta/services/user/user_fixtures.py @@ -0,0 +1,55 @@ +"""Shared helpers for `user` service check tests.""" + +from unittest import mock + +from prowler.providers.okta.services.user.user_service import ( + ExternalDirectoryIdp, + UserAutomation, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_user_client( + automations: dict = None, + external_directory_idps: dict = None, + audit_config: dict = None, + missing_scope: dict = None, +): + client = mock.MagicMock() + client.automations = automations or {} + client.external_directory_idps = external_directory_idps or {} + client.provider = set_mocked_okta_provider() + client.audit_config = audit_config or {} + client.missing_scope = missing_scope or { + "automations": None, + "identity_providers": None, + } + return client + + +def automation( + automation_id: str = "auto-1", + name: str = "User Inactivity", + status: str = "ACTIVE", + schedule_status: str = "ACTIVE", + inactivity_days: int = 35, + lifecycle_action: str = "SUSPENDED", + groups: list = None, +): + # `groups is None` keeps the "Everyone-equivalent" default; passing + # `groups=[]` lets a test exercise the empty-scope FAIL path. + return UserAutomation( + id=automation_id, + name=name, + status=status, + schedule_status=schedule_status, + inactivity_days=inactivity_days, + lifecycle_action=lifecycle_action, + applies_to_groups=["everyone"] if groups is None else groups, + ) + + +def ad_idp(idp_id: str = "0oa-ad", name: str = "Corp AD"): + return ExternalDirectoryIdp( + id=idp_id, name=name, type="ACTIVE_DIRECTORY", status="ACTIVE" + ) diff --git a/tests/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled_test.py b/tests/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled_test.py new file mode 100644 index 0000000000..99cb7db6a0 --- /dev/null +++ b/tests/providers/okta/services/user/user_inactivity_automation_35d_enabled/user_inactivity_automation_35d_enabled_test.py @@ -0,0 +1,165 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.user.user_fixtures import ( + ad_idp, + automation, + build_user_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.user." + "user_inactivity_automation_35d_enabled." + "user_inactivity_automation_35d_enabled.user_client" +) + + +def _run_check(client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=client), + ): + from prowler.providers.okta.services.user.user_inactivity_automation_35d_enabled.user_inactivity_automation_35d_enabled import ( + user_inactivity_automation_35d_enabled, + ) + + return user_inactivity_automation_35d_enabled().execute() + + +class Test_user_inactivity_automation_35d_enabled: + def test_pass_when_compliant_automation_present(self): + client = build_user_client( + automations={"auto-1": automation(name="Inactivity 35d")} + ) + findings = _run_check(client) + assert findings[0].status == "PASS" + assert "Inactivity 35d" in findings[0].status_extended + assert "SUSPENDED" in findings[0].status_extended + + def test_pass_message_names_groups_and_asks_for_coverage_verification(self): + # Okta has no built-in Everyone group ID and group names vary by + # tenant (e.g. "pepito"), so we can't assert tenant-wide coverage + # automatically — surface the group IDs and let the operator verify. + client = build_user_client( + automations={"auto-1": automation(groups=["grp-A", "grp-B"])} + ) + findings = _run_check(client) + assert findings[0].status == "PASS" + assert "grp-A, grp-B" in findings[0].status_extended + assert "cover every user" in findings[0].status_extended + + def test_fail_when_applies_to_no_group(self): + # An automation with empty `people.groups.include` runs against + # nobody — Okta does not implicitly cover every user. + client = build_user_client(automations={"auto-1": automation(groups=[])}) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "no group scope" in findings[0].status_extended + + def test_pass_when_lower_threshold(self): + # Inactivity threshold lower than the default is still compliant. + client = build_user_client( + automations={"auto-1": automation(inactivity_days=14)} + ) + findings = _run_check(client) + assert findings[0].status == "PASS" + + def test_fail_when_threshold_too_high(self): + client = build_user_client( + automations={"auto-1": automation(inactivity_days=90)} + ) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "inactivity 90d (max 35d)" in findings[0].status_extended + + def test_fail_when_status_inactive(self): + client = build_user_client( + automations={"auto-1": automation(status="INACTIVE")} + ) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "status INACTIVE" in findings[0].status_extended + + def test_fail_when_schedule_inactive(self): + client = build_user_client( + automations={"auto-1": automation(schedule_status="INACTIVE")} + ) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "schedule INACTIVE" in findings[0].status_extended + + def test_fail_when_wrong_lifecycle_action(self): + client = build_user_client( + automations={"auto-1": automation(lifecycle_action="ACTIVE")} + ) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "action ACTIVE" in findings[0].status_extended + + def test_fail_when_no_automations(self): + client = build_user_client(automations={}) + findings = _run_check(client) + assert findings[0].status == "FAIL" + assert "No Okta Workflows automations" in findings[0].status_extended + + def test_fail_lists_every_missing_piece_for_unfinished_automation(self): + # Mirrors the real-world case where an admin clicks "Add Automation" + # in the UI but never configures conditions or actions. The service + # emits a placeholder UserAutomation so the check FAILs with a + # specific message instead of pretending the policy doesn't exist. + from prowler.providers.okta.services.user.user_service import UserAutomation + + shell = UserAutomation( + id="pol-1", + name="TestCheck", + status="INACTIVE", + schedule_status="INACTIVE", + inactivity_days=None, + lifecycle_action=None, + applies_to_groups=[], + policy_id="pol-1", + policy_name="TestCheck", + ) + client = build_user_client(automations={"pol-1": shell}) + findings = _run_check(client) + assert findings[0].status == "FAIL" + msg = findings[0].status_extended + assert "TestCheck" in msg + assert "status INACTIVE" in msg + assert "schedule INACTIVE" in msg + assert "no inactivity condition" in msg + assert "action unset" in msg + + def test_manual_na_when_external_directory_idp_present(self): + client = build_user_client( + automations={"auto-1": automation(inactivity_days=90)}, # non-compliant + external_directory_idps={"0oa-ad": ad_idp(name="Corp AD")}, + ) + findings = _run_check(client) + # External directory short-circuits to MANUAL N/A regardless of + # the automations state. + assert findings[0].status == "MANUAL" + assert "ACTIVE_DIRECTORY" in findings[0].status_extended + assert "Corp AD" in findings[0].status_extended + + def test_manual_when_scope_missing(self): + client = build_user_client( + missing_scope={ + "automations": "okta.policies.read", + "identity_providers": None, + } + ) + findings = _run_check(client) + assert findings[0].status == "MANUAL" + assert "okta.policies.read" in findings[0].status_extended + + def test_threshold_overridden_via_audit_config(self): + client = build_user_client( + automations={"auto-1": automation(inactivity_days=60)}, + audit_config={"okta_user_inactivity_max_days": 90}, + ) + findings = _run_check(client) + assert findings[0].status == "PASS" diff --git a/tests/providers/okta/services/user/user_service_test.py b/tests/providers/okta/services/user/user_service_test.py new file mode 100644 index 0000000000..f4e0309b7b --- /dev/null +++ b/tests/providers/okta/services/user/user_service_test.py @@ -0,0 +1,477 @@ +import json +from unittest import mock + +from prowler.providers.okta.services.user.user_service import ( + ExternalDirectoryIdp, + User, + UserAutomation, + _raw_rule_to_automation, + _rule_to_automation, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + r = mock.MagicMock() + r.headers = headers or {} + return r + + +def _fake_policy( + policy_id, + name="Inactivity Policy", + status="ACTIVE", + inactivity_days=35, + inactivity_unit="DAYS", + groups=None, +): + # In the actual API response, the inactivity condition and the + # group scope live on the *policy*, not on its rules — keep the + # typed fixture aligned with that shape so it mirrors raw JSON. + p = mock.MagicMock() + p.id = policy_id + p.name = name + p.status = status + if inactivity_days is None: + p.conditions.people.users.inactivity = None + else: + p.conditions.people.users.inactivity.number = inactivity_days + p.conditions.people.users.inactivity.unit = inactivity_unit + p.conditions.people.groups.include = ["everyone"] if groups is None else groups + return p + + +def _fake_rule( + rule_id="rule-1", + name="Inactivity", + status="ACTIVE", + lifecycle_action="SUSPENDED", +): + # A USER_LIFECYCLE policy rule carries only the lifecycle action; + # its `conditions` is typically empty. + r = mock.MagicMock() + r.id = rule_id + r.name = name + r.status = status + r.actions.user_lifecycle.action = lifecycle_action + return r + + +def _fake_idp(idp_type, status="ACTIVE", idp_id="0oa-1", name="x"): + idp = mock.MagicMock() + idp.id = idp_id + idp.name = name + idp.type = idp_type + idp.status = status + return idp + + +def _patch_sdk(**methods): + return mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=mock.MagicMock(**methods), + ) + + +class Test_rule_to_automation: + def test_parses_inactivity_and_lifecycle(self): + policy = _fake_policy("pol-1", name="Inactivity Policy") + rule = _fake_rule(rule_id="rule-1", name="Inactivity") + m = _rule_to_automation(rule, policy) + assert isinstance(m, UserAutomation) + assert m.id == "rule-1" + assert m.status == "ACTIVE" + assert m.schedule_status == "ACTIVE" + assert m.inactivity_days == 35 + assert m.lifecycle_action == "SUSPENDED" + assert m.applies_to_groups == ["everyone"] + assert m.policy_id == "pol-1" + assert m.policy_name == "Inactivity Policy" + + def test_returns_none_when_id_missing(self): + policy = _fake_policy("pol") + bad = _fake_rule() + bad.id = "" + assert _rule_to_automation(bad, policy) is None + + def test_ignores_non_days_unit(self): + policy = _fake_policy("pol", inactivity_unit="WEEKS") + rule = _fake_rule() + m = _rule_to_automation(rule, policy) + assert m.inactivity_days is None + + def test_reads_inactivity_and_groups_from_policy_not_rule(self): + # The typed path used to read inactivity/groups from the rule; + # an SDK update that started populating `policy.conditions` + # exposed the mismatch. Locking the policy-shaped projection in. + policy = _fake_policy("pol", inactivity_days=21, groups=["grp-x"]) + rule = _fake_rule() + # Sanity: nothing inactivity-ish on the rule. + del rule.conditions + m = _rule_to_automation(rule, policy) + assert m.inactivity_days == 21 + assert m.applies_to_groups == ["grp-x"] + + +class Test_User_service: + def test_fetches_automations_via_policy_api(self): + provider = set_mocked_okta_provider() + policy = _fake_policy("pol-1") + rule = _fake_rule(rule_id="rule-1") + + async def fake_list_policies(*_a, **_k): + return ([policy], _resp({}), None) + + async def fake_list_rules(*_a, **_k): + return ([rule], _resp({}), None) + + async def fake_list_idps(*_a, **_k): + return ([], _resp({}), None) + + sdk = mock.MagicMock() + sdk.list_policies = fake_list_policies + sdk.list_policy_rules = fake_list_rules + sdk.list_identity_providers = fake_list_idps + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = User(provider) + + assert "rule-1" in service.automations + assert service.automations["rule-1"].inactivity_days == 35 + assert service.external_directory_idps == {} + + def test_returns_empty_on_policies_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(*_a, **_k): + return ([], _resp({}), Exception("E0000007")) + + async def fake_list_idps(*_a, **_k): + return ([], _resp({}), None) + + sdk = mock.MagicMock() + sdk.list_policies = failing + sdk.list_identity_providers = fake_list_idps + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = User(provider) + + assert service.automations == {} + + def test_detects_external_directory_idp(self): + provider = set_mocked_okta_provider() + + async def empty_policies(*_a, **_k): + return ([], _resp({}), None) + + ad = _fake_idp("ACTIVE_DIRECTORY", idp_id="0oa-ad", name="Corp AD") + saml = _fake_idp("SAML2", idp_id="0oa-saml") + + async def fake_list_idps(*_a, **_k): + return ([ad, saml], _resp({}), None) + + sdk = mock.MagicMock() + sdk.list_policies = empty_policies + sdk.list_identity_providers = fake_list_idps + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = User(provider) + + assert "0oa-ad" in service.external_directory_idps + assert "0oa-saml" not in service.external_directory_idps + assert isinstance( + service.external_directory_idps["0oa-ad"], ExternalDirectoryIdp + ) + + +class Test_raw_rule_to_automation: + def test_projects_inactivity_and_lifecycle(self): + # Real API shape: inactivity + groups live on the POLICY, + # lifecycle action lives on the RULE under + # `actions.updateUserLifecycle.targetStatus`. + policy = { + "id": "pol-1", + "name": "TestCheck", + "status": "ACTIVE", + "conditions": { + "people": { + "users": {"inactivity": {"number": 35, "unit": "DAYS"}}, + "groups": {"include": ["everyone"]}, + } + }, + "type": "USER_LIFECYCLE", + } + rule = { + "id": "rule-1", + "name": "lifecycle-rule-1", + "status": "ACTIVE", + "conditions": {}, + "actions": { + "updateUserLifecycle": { + "targetStatus": "SUSPENDED", + "quietPeriod": {"number": 0, "unit": "DAYS"}, + } + }, + } + m = _raw_rule_to_automation(rule, policy, "pol-1", "TestCheck", "ACTIVE") + assert isinstance(m, UserAutomation) + assert m.id == "rule-1" + assert m.status == "ACTIVE" + assert m.schedule_status == "ACTIVE" + assert m.inactivity_days == 35 + assert m.lifecycle_action == "SUSPENDED" + assert m.applies_to_groups == ["everyone"] + assert m.policy_id == "pol-1" + assert m.policy_name == "TestCheck" + + def test_returns_none_when_id_missing(self): + assert _raw_rule_to_automation({"name": "x"}, {}, "pol", "P", "ACTIVE") is None + + def test_ignores_non_days_unit(self): + policy = { + "id": "pol", + "conditions": { + "people": {"users": {"inactivity": {"number": 5, "unit": "WEEKS"}}} + }, + } + rule = {"id": "rule-2", "actions": {}} + m = _raw_rule_to_automation(rule, policy, "pol", "P", "ACTIVE") + assert m.inactivity_days is None + + def test_missing_policy_dict_gives_empty_inactivity_and_groups(self): + rule = { + "id": "rule-3", + "actions": {"updateUserLifecycle": {"targetStatus": "SUSPENDED"}}, + } + m = _raw_rule_to_automation(rule, None, "pol", "P", "ACTIVE") + assert m.inactivity_days is None + assert m.applies_to_groups == [] + assert m.lifecycle_action == "SUSPENDED" + + +class Test_User_service_sdk_discriminator_fallback: + """Verifies the raw-JSON fallback when the SDK can't deserialize USER_LIFECYCLE. + + Okta SDK 3.4.2 ships a `Policy.from_dict` discriminator mapping that + omits `USER_LIFECYCLE`, so the typed call raises ValueError. Without + the fallback the whole automations list is lost; with it the raw + JSON path projects each rule onto a `UserAutomation` snapshot. + """ + + def test_raw_fallback_projects_user_lifecycle_policy_rules(self): + provider = set_mocked_okta_provider() + + # Real API shape: inactivity + groups on POLICY, lifecycle + # action on RULE under `actions.updateUserLifecycle.targetStatus`. + policy_payload = [ + { + "id": "pol-1", + "name": "TestCheck", + "status": "ACTIVE", + "type": "USER_LIFECYCLE", + "conditions": { + "people": { + "users": {"inactivity": {"number": 35, "unit": "DAYS"}}, + "groups": {"include": ["everyone"]}, + } + }, + } + ] + rules_payload = [ + { + "id": "rule-1", + "name": "lifecycle-rule-1", + "status": "ACTIVE", + "conditions": {}, + "actions": { + "updateUserLifecycle": { + "targetStatus": "SUSPENDED", + "quietPeriod": {"number": 0, "unit": "DAYS"}, + } + }, + } + ] + + async def failing_list_policies(*_a, **_k): + raise ValueError( + "Policy failed to lookup discriminator value from {...}. " + "Discriminator property name: type, mapping: {...}" + ) + + async def fake_list_idps(*_a, **_k): + return ([], _resp({}), None) + + async def fake_raw_create(*_a, **kwargs): + url = kwargs.get("url", "") or "" + return ({"url": url}, None) + + async def fake_raw_execute(request): + url = request.get("url", "") + if "/api/v1/policies/pol-1/rules" in url: + return (None, json.dumps(rules_payload), None) + if "/api/v1/policies" in url: + return (None, json.dumps(policy_payload), None) + return (None, "[]", None) + + sdk = mock.MagicMock() + sdk.list_policies = failing_list_policies + sdk.list_identity_providers = fake_list_idps + sdk._request_executor.create_request = fake_raw_create + sdk._request_executor.execute = fake_raw_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = User(provider) + + assert "rule-1" in service.automations + a = service.automations["rule-1"] + assert a.inactivity_days == 35 + assert a.lifecycle_action == "SUSPENDED" + assert a.schedule_status == "ACTIVE" + assert a.policy_id == "pol-1" + assert a.policy_name == "TestCheck" + + def test_raw_fallback_emits_shell_for_policy_with_no_rules(self): + # Mirrors the real-world tenant state where an admin clicked + # "Add Automation" in the UI but never configured conditions or + # actions. The policy exists; it has zero rules. The raw fallback + # must surface the policy as a shell UserAutomation so the check + # FAILs with a specific message instead of dropping it. + provider = set_mocked_okta_provider() + + async def failing_list_policies(*_a, **_k): + raise ValueError("missing discriminator mapping") + + async def fake_list_idps(*_a, **_k): + return ([], _resp({}), None) + + async def fake_raw_create(*_a, **kwargs): + return ({"url": kwargs.get("url", "") or ""}, None) + + async def fake_raw_execute(request): + url = request.get("url", "") + if "/api/v1/policies/pol-empty/rules" in url: + return (None, "[]", None) + if "/api/v1/policies" in url: + return ( + None, + json.dumps( + [ + { + "id": "pol-empty", + "name": "TestCheck", + "status": "INACTIVE", + "type": "USER_LIFECYCLE", + } + ] + ), + None, + ) + return (None, "[]", None) + + sdk = mock.MagicMock() + sdk.list_policies = failing_list_policies + sdk.list_identity_providers = fake_list_idps + sdk._request_executor.create_request = fake_raw_create + sdk._request_executor.execute = fake_raw_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = User(provider) + + assert "pol-empty" in service.automations + shell = service.automations["pol-empty"] + assert shell.name == "TestCheck" + assert shell.status == "INACTIVE" + assert shell.schedule_status == "INACTIVE" + assert shell.inactivity_days is None + assert shell.lifecycle_action is None + assert shell.applies_to_groups == [] + assert shell.policy_id == "pol-empty" + + def test_rule_typed_failure_triggers_raw_fallback_for_all_policies(self): + # When the typed `list_policies` succeeds but the typed + # `list_policy_rules` fails for a policy, the previous behavior + # was to emit a shell automation — silently misclassifying a + # valid automation as "unfinished". Now `_fetch_rules` returns + # None as a sentinel and the caller re-runs the entire + # discovery via raw JSON so no rule data is lost. + provider = set_mocked_okta_provider() + + typed_policy = _fake_policy( + "pol-1", name="TestCheck", inactivity_days=35, groups=["everyone"] + ) + + async def fake_list_policies(*_a, **_k): + return ([typed_policy], _resp({}), None) + + async def failing_list_policy_rules(*_a, **_k): + raise ValueError("KnowledgeConstraint.types expected uppercase") + + async def fake_list_idps(*_a, **_k): + return ([], _resp({}), None) + + raw_policy_payload = [ + { + "id": "pol-1", + "name": "TestCheck", + "status": "ACTIVE", + "type": "USER_LIFECYCLE", + "conditions": { + "people": { + "users": {"inactivity": {"number": 35, "unit": "DAYS"}}, + "groups": {"include": ["everyone"]}, + } + }, + } + ] + raw_rules_payload = [ + { + "id": "rule-1", + "name": "lifecycle-rule-1", + "status": "ACTIVE", + "actions": {"updateUserLifecycle": {"targetStatus": "SUSPENDED"}}, + } + ] + + async def fake_raw_create(*_a, **kwargs): + return ({"url": kwargs.get("url", "") or ""}, None) + + async def fake_raw_execute(request): + url = request.get("url", "") + if "/api/v1/policies/pol-1/rules" in url: + return (None, json.dumps(raw_rules_payload), None) + if "/api/v1/policies" in url: + return (None, json.dumps(raw_policy_payload), None) + return (None, "[]", None) + + sdk = mock.MagicMock() + sdk.list_policies = fake_list_policies + sdk.list_policy_rules = failing_list_policy_rules + sdk.list_identity_providers = fake_list_idps + sdk._request_executor.create_request = fake_raw_create + sdk._request_executor.execute = fake_raw_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk, + ): + service = User(provider) + + # Raw-projected automation, not a shell. + assert "rule-1" in service.automations + assert service.automations["rule-1"].inactivity_days == 35 + assert service.automations["rule-1"].lifecycle_action == "SUSPENDED"