diff --git a/prowler/providers/okta/lib/service/scope.py b/prowler/providers/okta/lib/service/scope.py new file mode 100644 index 0000000000..ead482babc --- /dev/null +++ b/prowler/providers/okta/lib/service/scope.py @@ -0,0 +1,38 @@ +from pydantic import BaseModel + +from prowler.lib.check.models import CheckReportOkta + + +class MissingScopeResource(BaseModel): + """Synthetic resource used when a check cannot evaluate an Okta API.""" + + id: str + name: str + + +def missing_scope_finding( + *, + metadata, + org_domain: str, + resource_id: str, + resource_name: str, + missing_scopes: list[str], + action: str, +) -> CheckReportOkta: + """Build a MANUAL finding for checks blocked by missing OAuth scopes.""" + resource = MissingScopeResource(id=resource_id, name=resource_name) + report = CheckReportOkta( + metadata=metadata, + resource=resource, + org_domain=org_domain, + resource_id=resource.id, + resource_name=resource.name, + ) + report.status = "MANUAL" + report.status_extended = ( + f"Prowler could not {action} because the Okta service app is missing " + f"required OAuth scope(s): {', '.join(missing_scopes)}. Grant the " + "scope(s) to the service app and rerun the check, or review the " + "configuration manually in the Okta Admin Console." + ) + return report diff --git a/prowler/providers/okta/lib/service/service.py b/prowler/providers/okta/lib/service/service.py index baaabcb219..c0661aae38 100644 --- a/prowler/providers/okta/lib/service/service.py +++ b/prowler/providers/okta/lib/service/service.py @@ -32,3 +32,8 @@ class OktaService: def _run(coro): """Run an okta-sdk-python coroutine from synchronous code.""" return asyncio.run(coro) + + def _missing_scopes(self, required_scopes: list[str]) -> list[str]: + """Return required OAuth scopes not granted to the Okta service app.""" + granted_scopes = set(getattr(self.provider.session, "scopes", []) or []) + return [scope for scope in required_scopes if scope not in granted_scopes] diff --git a/prowler/providers/okta/okta_provider.py b/prowler/providers/okta/okta_provider.py index a6bff16d80..ab7f047fb8 100644 --- a/prowler/providers/okta/okta_provider.py +++ b/prowler/providers/okta/okta_provider.py @@ -32,7 +32,14 @@ from prowler.providers.okta.exceptions.exceptions import ( from prowler.providers.okta.lib.mutelist.mutelist import OktaMutelist from prowler.providers.okta.models import OktaIdentityInfo, OktaSession -DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read"] +DEFAULT_SCOPES = [ + "okta.policies.read", + "okta.brands.read", + "okta.networkZones.read", + "okta.apiTokens.read", + "okta.roles.read", + "okta.authenticators.read", +] # Accept only Okta-managed domains. Custom (vanity) domains are rejected on # purpose — they're a recurring source of typos and silent misconfig and # Prowler's audience overwhelmingly uses Okta-managed hosts. The TLDs below diff --git a/prowler/providers/okta/services/apitoken/__init__.py b/prowler/providers/okta/services/apitoken/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/api_token_client.py b/prowler/providers/okta/services/apitoken/api_token_client.py new file mode 100644 index 0000000000..fbe10d7c7f --- /dev/null +++ b/prowler/providers/okta/services/apitoken/api_token_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.apitoken.api_token_service import ApiToken + +api_token_client = ApiToken(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/apitoken/api_token_service.py b/prowler/providers/okta/services/apitoken/api_token_service.py new file mode 100644 index 0000000000..3e7b0e1471 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/api_token_service.py @@ -0,0 +1,201 @@ +from typing import Optional +from urllib.parse import parse_qs, urlparse + +from pydantic import BaseModel, Field + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.service import OktaService + +API_TOKENS_READ_SCOPE = "okta.apiTokens.read" +NETWORK_ZONES_READ_SCOPE = "okta.networkZones.read" +ROLES_READ_SCOPE = "okta.roles.read" + + +def _next_after_cursor(resp) -> Optional[str]: + """Extract the Okta pagination cursor from a Link header.""" + if resp is None: + return None + headers = getattr(resp, "headers", None) or {} + link = headers.get("link") or headers.get("Link") or "" + if not link: + return None + for part in link.split(","): + if 'rel="next"' not in part: + continue + url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">") + cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0] + if cursor: + return cursor + return None + + +def _normalise_sdk_result(result) -> tuple[list, object, object]: + """Return `(items, response, error)` for Okta SDK list call variants.""" + if isinstance(result, tuple): + err = result[-1] + items = result[0] or [] + resp = result[1] if len(result) >= 3 else None + return list(items), resp, err + return list(result or []), None, None + + +def _value(value) -> str: + """Return plain string values from Okta SDK enums and raw strings.""" + if value is None: + return "" + enum_value = getattr(value, "value", None) + if enum_value is not None: + return str(enum_value) + return str(value) + + +class ApiToken(OktaService): + """Fetches Okta API token metadata, token owners' roles, and zones.""" + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + self.missing_scopes: list[str] = self._missing_scopes( + [API_TOKENS_READ_SCOPE, NETWORK_ZONES_READ_SCOPE, ROLES_READ_SCOPE] + ) + self.known_network_zone_ids: set[str] = self._list_known_network_zone_ids() + self.api_tokens: dict[str, OktaApiToken] = self._list_api_tokens() + + def _list_api_tokens(self) -> dict[str, "OktaApiToken"]: + """List active API token metadata and owner roles.""" + if API_TOKENS_READ_SCOPE in self.missing_scopes: + logger.warning( + "ApiToken - Skipping API Tokens API call because required " + f"scope is missing: {API_TOKENS_READ_SCOPE}" + ) + return {} + logger.info("ApiToken - Listing Okta API tokens...") + try: + return self._run(self._fetch_api_tokens()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_api_tokens(self) -> dict[str, "OktaApiToken"]: + result: dict[str, OktaApiToken] = {} + items, _resp, err = _normalise_sdk_result(await self.client.list_api_tokens()) + if err is not None: + logger.error(f"Error listing API tokens: {err}") + return result + + for token in items: + token_id = _value(getattr(token, "id", None)) + user_id = _value(getattr(token, "user_id", None)) + roles = await self._fetch_user_role_types(user_id) if user_id else [] + network = getattr(token, "network", None) + token_obj = OktaApiToken( + id=token_id, + name=_value(getattr(token, "name", None)) or token_id, + client_name=_value(getattr(token, "client_name", None)), + user_id=user_id, + network_connection=_value(getattr(network, "connection", None)), + network_includes=list(getattr(network, "include", None) or []), + network_excludes=list(getattr(network, "exclude", None) or []), + owner_roles=roles, + ) + result[token_obj.id] = token_obj + return result + + async def _fetch_user_role_types(self, user_id: str) -> list[str]: + """Return normalized admin role types assigned to the token owner.""" + if ROLES_READ_SCOPE in self.missing_scopes: + logger.warning( + "ApiToken - Skipping assigned role lookup for token owner " + f"{user_id} because required scope is missing: {ROLES_READ_SCOPE}" + ) + return [] + items, _resp, err = _normalise_sdk_result( + await self.client.list_assigned_roles_for_user(user_id) + ) + if err is not None: + logger.error(f"Error listing roles for token owner {user_id}: {err}") + return [] + roles = [] + for role in items: + role_type = _value(getattr(role, "type", None)) + role_label = _value(getattr(role, "label", None)) + roles.append(role_type or role_label) + return [role for role in roles if role] + + def _list_known_network_zone_ids(self) -> set[str]: + """List known Network Zone ids and names for token condition validation.""" + if API_TOKENS_READ_SCOPE in self.missing_scopes: + logger.warning( + "ApiToken - Skipping Network Zones API call because API token " + f"listing is unavailable without {API_TOKENS_READ_SCOPE}." + ) + return set() + if NETWORK_ZONES_READ_SCOPE in self.missing_scopes: + logger.warning( + "ApiToken - Skipping Network Zones API call because required " + f"scope is missing: {NETWORK_ZONES_READ_SCOPE}" + ) + return set() + logger.info("ApiToken - Listing Network Zones for token restrictions...") + try: + return self._run(self._fetch_known_network_zone_ids()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return set() + + async def _fetch_known_network_zone_ids(self) -> set[str]: + identifiers: set[str] = set() + items, err = await self._fetch_all_network_zones() + if err is not None: + logger.error(f"Error listing Network Zones for API token checks: {err}") + return identifiers + for zone in items: + zone_id = _value(getattr(zone, "id", None)) + zone_name = _value(getattr(zone, "name", None)) + if zone_id: + identifiers.add(zone_id) + if zone_name: + identifiers.add(zone_name) + return identifiers + + async def _fetch_all_network_zones(self) -> tuple[list, object]: + """Drain all Network Zone pages for API token reference validation.""" + all_items = [] + result = await self.client.list_network_zones(after=None, limit=200) + items, resp, err = _normalise_sdk_result(result) + if err is not None: + return [], err + all_items.extend(items) + while True: + cursor = _next_after_cursor(resp) + if not cursor: + break + result = await self.client.list_network_zones(after=cursor, limit=200) + items, resp, err = _normalise_sdk_result(result) + if err is not None: + return all_items, err + all_items.extend(items) + return all_items, None + + +class OktaApiToken(BaseModel): + """Normalized Okta API token metadata used by checks.""" + + id: str + name: str + client_name: str = "" + user_id: str = "" + network_connection: str = "" + network_includes: list[str] = Field(default_factory=list) + network_excludes: list[str] = Field(default_factory=list) + owner_roles: list[str] = Field(default_factory=list) + + +class ApiTokenSummary(BaseModel): + """Synthetic resource for org-level API token findings.""" + + id: str = "okta-api-tokens" + name: str = "Okta API Tokens" diff --git a/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/__init__.py b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json new file mode 100644 index 0000000000..212230523b --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "apitoken_not_super_admin", + "CheckTitle": "Okta API tokens are not owned by Super Admin users", + "CheckType": [], + "ServiceName": "apitoken", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether visible Okta API tokens are owned by users without the Super Admin role, because API tokens inherit the administrative permissions of their owner.", + "Risk": "When an API token is owned by a Super Admin, exposure of that token may grant broad organization administration capability to an attacker.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/guides/api-security/", + "https://developer.okta.com/docs/reference/api/roles/" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Create a dedicated service account, assign only required admin roles, rotate the API token, and revoke Super Admin-owned tokens.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review API token owners, move tokens to dedicated service accounts with least-privilege admin roles, rotate affected tokens, and revoke Super Admin-owned tokens.", + "Url": "https://hub.prowler.com/check/apitoken_not_super_admin" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py new file mode 100644 index 0000000000..0d260e742c --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py @@ -0,0 +1,60 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.lib.service.scope import missing_scope_finding +from prowler.providers.okta.services.apitoken.api_token_client import api_token_client +from prowler.providers.okta.services.apitoken.api_token_service import ( + API_TOKENS_READ_SCOPE, + ROLES_READ_SCOPE, +) +from prowler.providers.okta.services.apitoken.lib.api_token_helpers import ( + owner_has_super_admin, +) + + +class apitoken_not_super_admin(Check): + """Ensure Okta API tokens are not owned by Super Admin users.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate every active API token owner's assigned admin roles.""" + org_domain = api_token_client.provider.identity.org_domain + missing_scopes = [ + scope + for scope in (API_TOKENS_READ_SCOPE, ROLES_READ_SCOPE) + if scope in api_token_client.missing_scopes + ] + if missing_scopes: + return [ + missing_scope_finding( + metadata=self.metadata(), + org_domain=org_domain, + resource_id="okta-api-tokens", + resource_name="Okta API Tokens", + missing_scopes=missing_scopes, + action="evaluate API token owner admin roles", + ) + ] + + findings: list[CheckReportOkta] = [] + for token in api_token_client.api_tokens.values(): + report = CheckReportOkta( + metadata=self.metadata(), resource=token, org_domain=org_domain + ) + if owner_has_super_admin(token): + report.status = "FAIL" + report.status_extended = ( + f"API token '{token.name}' is owned by user '{token.user_id}' " + "with the Super Admin role. Use a dedicated service account " + "with least-privilege admin roles instead." + ) + else: + roles = ( + ", ".join(token.owner_roles) + if token.owner_roles + else "no admin roles returned" + ) + report.status = "PASS" + report.status_extended = ( + f"API token '{token.name}' owner '{token.user_id}' is not " + f"assigned Super Admin ({roles})." + ) + findings.append(report) + return findings diff --git a/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/__init__.py b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json new file mode 100644 index 0000000000..fa4885e002 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "apitoken_restricted_to_network_zone", + "CheckTitle": "Okta API tokens are restricted to known Network Zones", + "CheckType": [], + "ServiceName": "apitoken", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether visible Okta API tokens are constrained to known Network Zones rather than being usable from Any IP.", + "Risk": "When an API token can be used from Any IP or references unknown Network Zones, an exposed token may be replayed from attacker-controlled infrastructure.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/reference/api/api-tokens/", + "https://help.okta.com/oie/en-us/content/topics/security/api.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > API > Tokens: edit each token Security section and select specific Network Zones instead of Any IP.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review each Okta API token and restrict it to one or more known IP-based Network Zones instead of Any IP.", + "Url": "https://hub.prowler.com/check/apitoken_restricted_to_network_zone" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py new file mode 100644 index 0000000000..98c98f6b38 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py @@ -0,0 +1,45 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.lib.service.scope import missing_scope_finding +from prowler.providers.okta.services.apitoken.api_token_client import api_token_client +from prowler.providers.okta.services.apitoken.api_token_service import ( + API_TOKENS_READ_SCOPE, + NETWORK_ZONES_READ_SCOPE, +) +from prowler.providers.okta.services.apitoken.lib.api_token_helpers import ( + network_zone_restriction_status, +) + + +class apitoken_restricted_to_network_zone(Check): + """Ensure Okta API tokens are restricted to known Network Zones.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate every active API token's network condition.""" + org_domain = api_token_client.provider.identity.org_domain + missing_scopes = [ + scope + for scope in (API_TOKENS_READ_SCOPE, NETWORK_ZONES_READ_SCOPE) + if scope in api_token_client.missing_scopes + ] + if missing_scopes: + return [ + missing_scope_finding( + metadata=self.metadata(), + org_domain=org_domain, + resource_id="okta-api-tokens", + resource_name="Okta API Tokens", + missing_scopes=missing_scopes, + action="evaluate API token Network Zone restrictions", + ) + ] + + findings: list[CheckReportOkta] = [] + for token in api_token_client.api_tokens.values(): + report = CheckReportOkta( + metadata=self.metadata(), resource=token, org_domain=org_domain + ) + report.status, report.status_extended = network_zone_restriction_status( + token, api_token_client.known_network_zone_ids + ) + findings.append(report) + return findings diff --git a/prowler/providers/okta/services/apitoken/lib/__init__.py b/prowler/providers/okta/services/apitoken/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/lib/api_token_helpers.py b/prowler/providers/okta/services/apitoken/lib/api_token_helpers.py new file mode 100644 index 0000000000..01433a5997 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/lib/api_token_helpers.py @@ -0,0 +1,49 @@ +from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken + +ANYWHERE_CONNECTIONS = {"", "ANYWHERE", "ANY_IP"} + + +def network_zone_restriction_status( + token: OktaApiToken, known_network_zone_ids: set[str] +) -> tuple[str, str]: + """Evaluate whether an API token is restricted to known Network Zones.""" + connection = token.network_connection.upper() + if connection in ANYWHERE_CONNECTIONS: + return ( + "FAIL", + f"API token '{token.name}' can be used from any IP address. " + "Restrict the token to one or more known Okta Network Zones.", + ) + + referenced_zones = token.network_includes + token.network_excludes + if not referenced_zones: + return ( + "FAIL", + f"API token '{token.name}' is not open to Any IP, but it does not " + "reference a specific Okta Network Zone.", + ) + + unknown_zones = [ + zone for zone in referenced_zones if zone not in known_network_zone_ids + ] + if unknown_zones: + return ( + "FAIL", + f"API token '{token.name}' references unknown Network Zone(s): " + f"{', '.join(unknown_zones)}.", + ) + + return ( + "PASS", + f"API token '{token.name}' is restricted to known Okta Network Zone(s): " + f"{', '.join(referenced_zones)}.", + ) + + +def owner_has_super_admin(token: OktaApiToken) -> bool: + """Return True when any token owner role is Super Admin.""" + for role in token.owner_roles: + normalized = role.strip().replace(" ", "_").upper() + if normalized in {"SUPER_ADMIN", "SUPER_ADMINISTRATOR"}: + return True + return False diff --git a/prowler/providers/okta/services/authenticator/__init__.py b/prowler/providers/okta/services/authenticator/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_client.py b/prowler/providers/okta/services/authenticator/authenticator_client.py new file mode 100644 index 0000000000..3657a2821e --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_client.py @@ -0,0 +1,6 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.authenticator.authenticator_service import ( + Authenticator, +) + +authenticator_client = Authenticator(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant.metadata.json new file mode 100644 index 0000000000..8b50eed14b --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_okta_verify_fips_compliant", + "CheckTitle": "Okta Verify requires FIPS-compliant devices", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether the Okta Verify authenticator requires FIPS-compliant devices for enrollment where FIPS-compliant authenticator usage is required.", + "Risk": "When Okta Verify enrollment allows non-FIPS devices, users may authenticate with devices that do not meet the expected cryptographic assurance level.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://help.okta.com/en-us/content/topics/mobile/ov-admin-config.htm", + "https://support.okta.com/help/s/article/How-to-Enable-FIPS-Encryption-on-Okta-Verify" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authenticators > Okta Verify: set FIPS Compliance to users enrolling in Okta Verify can use FIPS compliant devices only.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review the Okta Verify authenticator settings and enable FIPS compliance so enrollment is limited to FIPS-compliant devices.", + "Url": "https://hub.prowler.com/check/authenticator_okta_verify_fips_compliant" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant.py b/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant.py new file mode 100644 index 0000000000..4e04b682cc --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant.py @@ -0,0 +1,60 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.lib.service.scope import missing_scope_finding +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.authenticator_service import ( + AUTHENTICATORS_READ_SCOPE, +) +from prowler.providers.okta.services.authenticator.lib.authenticator_helpers import ( + find_authenticator_by_key, + missing_authenticator_resource, +) + + +class authenticator_okta_verify_fips_compliant(Check): + """Ensure Okta Verify restricts enrollment to FIPS-compliant devices.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate Okta Verify FIPS compliance settings.""" + org_domain = authenticator_client.provider.identity.org_domain + if AUTHENTICATORS_READ_SCOPE in authenticator_client.missing_scopes: + return [ + missing_scope_finding( + metadata=self.metadata(), + org_domain=org_domain, + resource_id="okta-authenticators", + resource_name="Okta Authenticators", + missing_scopes=[AUTHENTICATORS_READ_SCOPE], + action="evaluate Okta Verify FIPS compliance settings", + ) + ] + + authenticator = find_authenticator_by_key( + authenticator_client.authenticators, "okta_verify" + ) + resource = authenticator or missing_authenticator_resource( + "okta_verify", "Okta Verify authenticator" + ) + report = CheckReportOkta( + metadata=self.metadata(), resource=resource, org_domain=org_domain + ) + if not authenticator or authenticator.status.upper() != "ACTIVE": + report.status = "FAIL" + report.status_extended = ( + "Okta Verify authenticator is not active or missing." + ) + elif authenticator.fips.upper() == "REQUIRED": + report.status = "PASS" + report.status_extended = ( + "Okta Verify authenticator requires FIPS-compliant devices " + "for enrollment." + ) + else: + report.status = "FAIL" + report.status_extended = ( + "Okta Verify authenticator is active but does not require " + f"FIPS-compliant devices for enrollment (current value: " + f"{authenticator.fips or 'unset'})." + ) + return [report] diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/authenticator_password_common_password_check.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/authenticator_password_common_password_check.metadata.json new file mode 100644 index 0000000000..f499bf2d26 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/authenticator_password_common_password_check.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_common_password_check", + "CheckTitle": "Okta password policies check passwords against common-password dictionaries", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces common-password dictionary checks.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce common-password dictionary checks.", + "Url": "https://hub.prowler.com/check/authenticator_password_common_password_check" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/authenticator_password_common_password_check.py b/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/authenticator_password_common_password_check.py new file mode 100644 index 0000000000..340afb5bc9 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_common_password_check/authenticator_password_common_password_check.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_common_password_check(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="common_password_check", + requirement="common-password dictionary checks", + compliant=lambda value: value is True, + actual_label="common password check enabled", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/authenticator_password_complexity_lowercase.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/authenticator_password_complexity_lowercase.metadata.json new file mode 100644 index 0000000000..f3bc119d73 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/authenticator_password_complexity_lowercase.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_complexity_lowercase", + "CheckTitle": "Okta password policies require lowercase characters", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one lowercase character.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce at least one lowercase character.", + "Url": "https://hub.prowler.com/check/authenticator_password_complexity_lowercase" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/authenticator_password_complexity_lowercase.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/authenticator_password_complexity_lowercase.py new file mode 100644 index 0000000000..e683c09595 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_lowercase/authenticator_password_complexity_lowercase.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_complexity_lowercase(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="min_lower_case", + requirement="at least one lowercase character", + compliant=lambda value: value is not None and value >= 1, + actual_label="minimum lowercase characters", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/authenticator_password_complexity_number.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/authenticator_password_complexity_number.metadata.json new file mode 100644 index 0000000000..9b4e56166f --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/authenticator_password_complexity_number.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_complexity_number", + "CheckTitle": "Okta password policies require numeric characters", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one numeric character.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce at least one numeric character.", + "Url": "https://hub.prowler.com/check/authenticator_password_complexity_number" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/authenticator_password_complexity_number.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/authenticator_password_complexity_number.py new file mode 100644 index 0000000000..fc605c20b2 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_number/authenticator_password_complexity_number.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_complexity_number(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="min_number", + requirement="at least one numeric character", + compliant=lambda value: value is not None and value >= 1, + actual_label="minimum numeric characters", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/authenticator_password_complexity_symbol.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/authenticator_password_complexity_symbol.metadata.json new file mode 100644 index 0000000000..7d3bae7390 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/authenticator_password_complexity_symbol.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_complexity_symbol", + "CheckTitle": "Okta password policies require symbol characters", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one symbol character.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce at least one symbol character.", + "Url": "https://hub.prowler.com/check/authenticator_password_complexity_symbol" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/authenticator_password_complexity_symbol.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/authenticator_password_complexity_symbol.py new file mode 100644 index 0000000000..f2ccd09fdc --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_symbol/authenticator_password_complexity_symbol.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_complexity_symbol(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="min_symbol", + requirement="at least one symbol character", + compliant=lambda value: value is not None and value >= 1, + actual_label="minimum symbol characters", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/authenticator_password_complexity_uppercase.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/authenticator_password_complexity_uppercase.metadata.json new file mode 100644 index 0000000000..61307c3d8f --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/authenticator_password_complexity_uppercase.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_complexity_uppercase", + "CheckTitle": "Okta password policies require uppercase characters", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one uppercase character.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce at least one uppercase character.", + "Url": "https://hub.prowler.com/check/authenticator_password_complexity_uppercase" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/authenticator_password_complexity_uppercase.py b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/authenticator_password_complexity_uppercase.py new file mode 100644 index 0000000000..4c86bd6460 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_complexity_uppercase/authenticator_password_complexity_uppercase.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_complexity_uppercase(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="min_upper_case", + requirement="at least one uppercase character", + compliant=lambda value: value is not None and value >= 1, + actual_label="minimum uppercase characters", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_history_5/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_history_5/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_history_5/authenticator_password_history_5.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_history_5/authenticator_password_history_5.metadata.json new file mode 100644 index 0000000000..3bffd2fbb9 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_history_5/authenticator_password_history_5.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_history_5", + "CheckTitle": "Okta password policies remember at least 5 previous passwords", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces password history of at least 5 previous passwords.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce password history of at least 5 previous passwords.", + "Url": "https://hub.prowler.com/check/authenticator_password_history_5" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_history_5/authenticator_password_history_5.py b/prowler/providers/okta/services/authenticator/authenticator_password_history_5/authenticator_password_history_5.py new file mode 100644 index 0000000000..6e63676d18 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_history_5/authenticator_password_history_5.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_history_5(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="history_count", + requirement="password history of at least 5 previous passwords", + compliant=lambda value: value is not None and value >= 5, + actual_label="password history count", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/authenticator_password_lockout_threshold_3.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/authenticator_password_lockout_threshold_3.metadata.json new file mode 100644 index 0000000000..27c7f95a9f --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/authenticator_password_lockout_threshold_3.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_lockout_threshold_3", + "CheckTitle": "Okta password policies lock accounts after 3 or fewer failed attempts", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces password lockout after 3 or fewer failed attempts.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce password lockout after 3 or fewer failed attempts.", + "Url": "https://hub.prowler.com/check/authenticator_password_lockout_threshold_3" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/authenticator_password_lockout_threshold_3.py b/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/authenticator_password_lockout_threshold_3.py new file mode 100644 index 0000000000..8db77f3f85 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_lockout_threshold_3/authenticator_password_lockout_threshold_3.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_lockout_threshold_3(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="max_attempts", + requirement="password lockout after 3 or fewer failed attempts", + compliant=lambda value: value is not None and value <= 3, + actual_label="maximum failed attempts", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/authenticator_password_maximum_age_60d.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/authenticator_password_maximum_age_60d.metadata.json new file mode 100644 index 0000000000..cec6a9be00 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/authenticator_password_maximum_age_60d.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_maximum_age_60d", + "CheckTitle": "Okta password policies enforce a maximum password age of 60 days", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces maximum password age of 60 days or less.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce maximum password age of 60 days or less.", + "Url": "https://hub.prowler.com/check/authenticator_password_maximum_age_60d" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/authenticator_password_maximum_age_60d.py b/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/authenticator_password_maximum_age_60d.py new file mode 100644 index 0000000000..6f24908890 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_maximum_age_60d/authenticator_password_maximum_age_60d.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_maximum_age_60d(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="max_age_days", + requirement="maximum password age of 60 days or less", + compliant=lambda value: value is not None and 0 < value <= 60, + actual_label="maximum age days", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/authenticator_password_minimum_age_24h.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/authenticator_password_minimum_age_24h.metadata.json new file mode 100644 index 0000000000..eea9defb34 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/authenticator_password_minimum_age_24h.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_minimum_age_24h", + "CheckTitle": "Okta password policies enforce a 24-hour minimum password age", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces minimum password age of at least 24 hours.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce minimum password age of at least 24 hours.", + "Url": "https://hub.prowler.com/check/authenticator_password_minimum_age_24h" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/authenticator_password_minimum_age_24h.py b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/authenticator_password_minimum_age_24h.py new file mode 100644 index 0000000000..34ad219974 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_age_24h/authenticator_password_minimum_age_24h.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_minimum_age_24h(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="min_age_minutes", + requirement="minimum password age of at least 24 hours", + compliant=lambda value: value is not None and value >= 1440, + actual_label="minimum age minutes", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/authenticator_password_minimum_length_15.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/authenticator_password_minimum_length_15.metadata.json new file mode 100644 index 0000000000..9a2f57a1fb --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/authenticator_password_minimum_length_15.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_password_minimum_length_15", + "CheckTitle": "Okta password policies require at least 15 characters", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether every visible active Okta Password Policy enforces minimum password length of at least 15 characters.", + "Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy", + "https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review every active Okta Password Policy and configure password settings to enforce minimum password length of at least 15 characters.", + "Url": "https://hub.prowler.com/check/authenticator_password_minimum_length_15" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/authenticator_password_minimum_length_15.py b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/authenticator_password_minimum_length_15.py new file mode 100644 index 0000000000..e28a243bfd --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_password_minimum_length_15/authenticator_password_minimum_length_15.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import ( + execute_password_policy_check, +) + + +class authenticator_password_minimum_length_15(Check): + """Ensure Okta Password Policies enforce the required STIG setting.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate all active Okta Password Policies.""" + return execute_password_policy_check( + metadata=self.metadata(), + org_domain=authenticator_client.provider.identity.org_domain, + password_policies=authenticator_client.password_policies, + missing_scopes=authenticator_client.missing_scopes, + field_name="min_length", + requirement="minimum password length of at least 15 characters", + compliant=lambda value: value is not None and value >= 15, + actual_label="minimum length", + ) diff --git a/prowler/providers/okta/services/authenticator/authenticator_service.py b/prowler/providers/okta/services/authenticator/authenticator_service.py new file mode 100644 index 0000000000..673ed1e8b1 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_service.py @@ -0,0 +1,246 @@ +from typing import Optional +from urllib.parse import parse_qs, urlparse + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.service import OktaService + +AUTHENTICATORS_READ_SCOPE = "okta.authenticators.read" +POLICIES_READ_SCOPE = "okta.policies.read" + + +def _next_after_cursor(resp) -> Optional[str]: + """Extract the Okta pagination cursor from a Link header.""" + if resp is None: + return None + headers = getattr(resp, "headers", None) or {} + link = headers.get("link") or headers.get("Link") or "" + if not link: + return None + for part in link.split(","): + if 'rel="next"' not in part: + continue + url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">") + cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0] + if cursor: + return cursor + return None + + +def _normalise_sdk_result(result) -> tuple[list, object, object]: + """Return `(items, response, error)` for Okta SDK list call variants.""" + if isinstance(result, tuple): + err = result[-1] + items = result[0] or [] + resp = result[1] if len(result) >= 3 else None + return list(items), resp, err + return list(result or []), None, None + + +def _value(value) -> str: + """Return plain string values from Okta SDK enums and raw strings.""" + if value is None: + return "" + enum_value = getattr(value, "value", None) + if enum_value is not None: + return str(enum_value) + return str(value) + + +def _int_or_none(value) -> Optional[int]: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _bool_or_none(value) -> Optional[bool]: + if value is None: + return None + if isinstance(value, bool): + return value + return bool(value) + + +class Authenticator(OktaService): + """Fetches Okta Password Policies and Authenticators for STIG checks.""" + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + self.missing_scopes: list[str] = self._missing_scopes( + [POLICIES_READ_SCOPE, AUTHENTICATORS_READ_SCOPE] + ) + self.password_policies: dict[str, PasswordPolicy] = ( + self._list_password_policies() + ) + self.authenticators: dict[str, OktaAuthenticator] = self._list_authenticators() + + def _list_password_policies(self) -> dict[str, "PasswordPolicy"]: + """List PASSWORD policies with normalized password settings.""" + if POLICIES_READ_SCOPE in self.missing_scopes: + logger.warning( + "Authenticator - Skipping Policies API call because required " + f"scope is missing: {POLICIES_READ_SCOPE}" + ) + return {} + logger.info("Authenticator - Listing Okta PASSWORD policies...") + try: + return self._run(self._fetch_password_policies()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_password_policies(self) -> dict[str, "PasswordPolicy"]: + result: dict[str, PasswordPolicy] = {} + items, err = await self._paginate( + lambda after: self.client.list_policies(type="PASSWORD", after=after) + ) + if err is not None: + logger.error(f"Error listing PASSWORD policies: {err}") + return result + + for policy in items: + policy_obj = self._build_password_policy(policy) + result[policy_obj.id] = policy_obj + return result + + def _list_authenticators(self) -> dict[str, "OktaAuthenticator"]: + """List org authenticators with normalized settings.""" + if AUTHENTICATORS_READ_SCOPE in self.missing_scopes: + logger.warning( + "Authenticator - Skipping Authenticators API call because required " + f"scope is missing: {AUTHENTICATORS_READ_SCOPE}" + ) + return {} + logger.info("Authenticator - Listing Okta authenticators...") + try: + return self._run(self._fetch_authenticators()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_authenticators(self) -> dict[str, "OktaAuthenticator"]: + result: dict[str, OktaAuthenticator] = {} + items, _resp, err = _normalise_sdk_result( + await self.client.list_authenticators() + ) + if err is not None: + logger.error(f"Error listing authenticators: {err}") + return result + + for authenticator in items: + auth_obj = self._build_authenticator(authenticator) + result[auth_obj.id] = auth_obj + return result + + @staticmethod + async def _paginate(fetch): + """Drain all pages of an SDK list call using Okta Link headers.""" + all_items = [] + result = await fetch(None) + items, resp, err = _normalise_sdk_result(result) + if err is not None: + return [], err + all_items.extend(items) + while True: + cursor = _next_after_cursor(resp) + if not cursor: + break + result = await fetch(cursor) + items, resp, err = _normalise_sdk_result(result) + if err is not None: + return all_items, err + all_items.extend(items) + return all_items, None + + @staticmethod + def _build_password_policy(policy) -> "PasswordPolicy": + settings = getattr(policy, "settings", None) + password_settings = getattr(settings, "password", None) if settings else None + lockout = ( + getattr(password_settings, "lockout", None) if password_settings else None + ) + complexity = ( + getattr(password_settings, "complexity", None) + if password_settings + else None + ) + dictionary = getattr(complexity, "dictionary", None) if complexity else None + age = getattr(password_settings, "age", None) if password_settings else None + policy_id = _value(getattr(policy, "id", None)) + return PasswordPolicy( + id=policy_id, + name=_value(getattr(policy, "name", None)) or policy_id, + status=_value(getattr(policy, "status", None)), + priority=_int_or_none(getattr(policy, "priority", None)), + is_default=bool(getattr(policy, "system", False)), + max_attempts=_int_or_none(getattr(lockout, "max_attempts", None)), + min_length=_int_or_none(getattr(complexity, "min_length", None)), + min_upper_case=_int_or_none(getattr(complexity, "min_upper_case", None)), + min_lower_case=_int_or_none(getattr(complexity, "min_lower_case", None)), + min_number=_int_or_none(getattr(complexity, "min_number", None)), + min_symbol=_int_or_none(getattr(complexity, "min_symbol", None)), + min_age_minutes=_int_or_none(getattr(age, "min_age_minutes", None)), + max_age_days=_int_or_none(getattr(age, "max_age_days", None)), + history_count=_int_or_none(getattr(age, "history_count", None)), + common_password_check=_bool_or_none(getattr(dictionary, "common", None)), + ) + + @staticmethod + def _build_authenticator(authenticator) -> "OktaAuthenticator": + settings = getattr(authenticator, "settings", None) + compliance = getattr(settings, "compliance", None) if settings else None + auth_id = _value(getattr(authenticator, "id", None)) + return OktaAuthenticator( + id=auth_id, + key=_value(getattr(authenticator, "key", None)), + name=_value(getattr(authenticator, "name", None)) or auth_id, + status=_value(getattr(authenticator, "status", None)), + type=_value(getattr(authenticator, "type", None)), + fips=_value(getattr(compliance, "fips", None)), + ) + + +class PasswordPolicy(BaseModel): + """Normalized Okta Password Policy settings used by checks.""" + + id: str + name: str + status: str = "" + priority: Optional[int] = None + is_default: bool = False + max_attempts: Optional[int] = None + min_length: Optional[int] = None + min_upper_case: Optional[int] = None + min_lower_case: Optional[int] = None + min_number: Optional[int] = None + min_symbol: Optional[int] = None + min_age_minutes: Optional[int] = None + max_age_days: Optional[int] = None + history_count: Optional[int] = None + common_password_check: Optional[bool] = None + + +class OktaAuthenticator(BaseModel): + """Normalized Okta Authenticator settings used by checks.""" + + id: str + key: str + name: str + status: str = "" + type: str = "" + fips: str = "" + + +class AuthenticatorSummary(BaseModel): + """Synthetic resource for org-level authenticator findings.""" + + id: str + name: str diff --git a/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/__init__.py b/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active.metadata.json b/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active.metadata.json new file mode 100644 index 0000000000..6eb290369d --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active.metadata.json @@ -0,0 +1,36 @@ +{ + "Provider": "okta", + "CheckID": "authenticator_smart_card_active", + "CheckTitle": "Okta Smart Card authenticator is active", + "CheckType": [], + "ServiceName": "authenticator", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "Prowler evaluates whether the Smart Card IdP authenticator is configured and active for certificate-based authentication scenarios required by policy.", + "Risk": "When Smart Card authentication is unavailable, users may be unable to satisfy certificate-based authentication requirements where they are mandated.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/authenticator" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Authenticators: add or activate the Smart Card IdP authenticator.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review Okta Authenticators and configure or activate the Smart Card IdP authenticator where certificate-based authentication is required.", + "Url": "https://hub.prowler.com/check/authenticator_smart_card_active" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active.py b/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active.py new file mode 100644 index 0000000000..81a5262706 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active.py @@ -0,0 +1,56 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.lib.service.scope import missing_scope_finding +from prowler.providers.okta.services.authenticator.authenticator_client import ( + authenticator_client, +) +from prowler.providers.okta.services.authenticator.authenticator_service import ( + AUTHENTICATORS_READ_SCOPE, +) +from prowler.providers.okta.services.authenticator.lib.authenticator_helpers import ( + find_authenticator_by_key, + missing_authenticator_resource, +) + + +class authenticator_smart_card_active(Check): + """Ensure the Smart Card IdP authenticator is active.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate the Smart Card IdP authenticator status.""" + org_domain = authenticator_client.provider.identity.org_domain + if AUTHENTICATORS_READ_SCOPE in authenticator_client.missing_scopes: + return [ + missing_scope_finding( + metadata=self.metadata(), + org_domain=org_domain, + resource_id="okta-authenticators", + resource_name="Okta Authenticators", + missing_scopes=[AUTHENTICATORS_READ_SCOPE], + action="evaluate Smart Card IdP authenticator status", + ) + ] + + authenticator = find_authenticator_by_key( + authenticator_client.authenticators, "smart_card_idp" + ) + resource = authenticator or missing_authenticator_resource( + "smart_card_idp", "Smart Card IdP authenticator" + ) + report = CheckReportOkta( + metadata=self.metadata(), resource=resource, org_domain=org_domain + ) + if authenticator and authenticator.status.upper() == "ACTIVE": + report.status = "PASS" + report.status_extended = "Smart Card IdP authenticator is ACTIVE." + elif authenticator: + report.status = "FAIL" + report.status_extended = ( + f"Smart Card IdP authenticator is not active; current status is " + f"{authenticator.status}." + ) + else: + report.status = "FAIL" + report.status_extended = ( + "Smart Card IdP authenticator is not active or missing." + ) + return [report] diff --git a/prowler/providers/okta/services/authenticator/lib/__init__.py b/prowler/providers/okta/services/authenticator/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/authenticator/lib/authenticator_helpers.py b/prowler/providers/okta/services/authenticator/lib/authenticator_helpers.py new file mode 100644 index 0000000000..cf49d8c121 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/lib/authenticator_helpers.py @@ -0,0 +1,19 @@ +from prowler.providers.okta.services.authenticator.authenticator_service import ( + AuthenticatorSummary, + OktaAuthenticator, +) + + +def find_authenticator_by_key( + authenticators: dict[str, OktaAuthenticator], key: str +) -> OktaAuthenticator | None: + """Return the first authenticator with the requested key.""" + for authenticator in authenticators.values(): + if authenticator.key == key: + return authenticator + return None + + +def missing_authenticator_resource(key: str, name: str) -> AuthenticatorSummary: + """Build a synthetic resource for a missing authenticator.""" + return AuthenticatorSummary(id=f"{key}-missing", name=name) diff --git a/prowler/providers/okta/services/authenticator/lib/password_policy_helpers.py b/prowler/providers/okta/services/authenticator/lib/password_policy_helpers.py new file mode 100644 index 0000000000..01ed137fc8 --- /dev/null +++ b/prowler/providers/okta/services/authenticator/lib/password_policy_helpers.py @@ -0,0 +1,101 @@ +from collections.abc import Callable + +from prowler.lib.check.models import CheckReportOkta +from prowler.providers.okta.lib.service.scope import missing_scope_finding +from prowler.providers.okta.services.authenticator.authenticator_service import ( + POLICIES_READ_SCOPE, + PasswordPolicy, +) + + +def active_password_policies( + password_policies: dict[str, PasswordPolicy], +) -> list[PasswordPolicy]: + """Return active password policies sorted by priority.""" + return sorted( + [ + policy + for policy in password_policies.values() + if not policy.status or policy.status.upper() == "ACTIVE" + ], + key=lambda policy: ( + policy.priority if policy.priority is not None else float("inf"), + policy.name, + ), + ) + + +def password_policy_label(policy: PasswordPolicy) -> str: + kind = "default" if policy.is_default else "custom" + priority = policy.priority if policy.priority is not None else "unset" + return f"Password Policy '{policy.name}' (priority {priority}, {kind})" + + +def no_active_password_policies_finding( + metadata, org_domain: str, requirement: str +) -> CheckReportOkta: + """Build the FAIL finding emitted when no active password policies exist.""" + placeholder = PasswordPolicy( + id="password-policies-missing", + name="(no active password policies)", + status="MISSING", + ) + report = CheckReportOkta( + metadata=metadata, resource=placeholder, org_domain=org_domain + ) + report.status = "FAIL" + report.status_extended = ( + "No active Okta Password Policies were returned by the API. " + f"The organization must enforce: {requirement}." + ) + return report + + +def execute_password_policy_check( + *, + metadata, + org_domain: str, + password_policies: dict[str, PasswordPolicy], + missing_scopes: list[str], + field_name: str, + requirement: str, + compliant: Callable[[object], bool], + actual_label: str, +) -> list[CheckReportOkta]: + """Evaluate a scalar password-policy setting across all active policies.""" + if POLICIES_READ_SCOPE in missing_scopes: + return [ + missing_scope_finding( + metadata=metadata, + org_domain=org_domain, + resource_id="okta-password-policies", + resource_name="Okta Password Policies", + missing_scopes=[POLICIES_READ_SCOPE], + action="evaluate active Password Policy settings", + ) + ] + + policies = active_password_policies(password_policies) + if not policies: + return [no_active_password_policies_finding(metadata, org_domain, requirement)] + + findings: list[CheckReportOkta] = [] + for policy in policies: + actual = getattr(policy, field_name) + report = CheckReportOkta( + metadata=metadata, resource=policy, org_domain=org_domain + ) + if compliant(actual): + report.status = "PASS" + report.status_extended = ( + f"{password_policy_label(policy)} enforces {requirement} " + f"({actual_label}: {actual})." + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"{password_policy_label(policy)} does not enforce {requirement} " + f"({actual_label}: {actual})." + ) + findings.append(report) + return findings diff --git a/prowler/providers/okta/services/network/__init__.py b/prowler/providers/okta/services/network/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/network/lib/__init__.py b/prowler/providers/okta/services/network/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/network/lib/network_zone_helpers.py b/prowler/providers/okta/services/network/lib/network_zone_helpers.py new file mode 100644 index 0000000000..99e83c13cc --- /dev/null +++ b/prowler/providers/okta/services/network/lib/network_zone_helpers.py @@ -0,0 +1,53 @@ +from prowler.providers.okta.services.network.network_zone_service import OktaNetworkZone + +ANONYMIZER_CATEGORY_MARKERS = ( + "ANONYM", + "PROXY", + "TOR", + "VPN", +) + + +def active_blocklist_zones( + network_zones: dict[str, OktaNetworkZone], +) -> list[OktaNetworkZone]: + """Return active Network Zones configured for blocklist usage.""" + return sorted( + [ + zone + for zone in network_zones.values() + if zone.status.upper() == "ACTIVE" and zone.usage.upper() == "BLOCKLIST" + ], + key=lambda zone: (zone.name, zone.id), + ) + + +def is_ip_blocklist_with_entries(zone: OktaNetworkZone) -> bool: + """Return True when an IP blocklist zone contains gateway/proxy entries.""" + return zone.type.upper() == "IP" and bool(zone.gateways or zone.proxies) + + +def is_enhanced_dynamic_anonymizer_blocklist(zone: OktaNetworkZone) -> bool: + """Return True for active Enhanced Dynamic blocklists covering anonymizers.""" + if zone.type.upper() != "DYNAMIC_V2": + return False + if zone.system and zone.name == "DefaultEnhancedDynamicZone": + return True + categories = [category.upper() for category in zone.ip_service_categories] + return any( + marker in category + for category in categories + for marker in ANONYMIZER_CATEGORY_MARKERS + ) + + +def compliant_anonymized_proxy_blocklist( + network_zones: dict[str, OktaNetworkZone], +) -> tuple[OktaNetworkZone | None, str]: + """Find the Network Zone that satisfies anonymized-proxy blocklisting.""" + for zone in active_blocklist_zones(network_zones): + if is_ip_blocklist_with_entries(zone): + return zone, "active IP blocklist with gateway or proxy IP entries" + if is_enhanced_dynamic_anonymizer_blocklist(zone): + return zone, "active Enhanced Dynamic Zone blocklist for anonymizers" + return None, "" diff --git a/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/__init__.py b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json new file mode 100644 index 0000000000..ef3ba9f289 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "network_zone_block_anonymized_proxies", + "CheckTitle": "Okta blocks anonymized proxy access with active Network Zone blocklists", + "CheckType": [], + "ServiceName": "network", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "network", + "Description": "Prowler evaluates whether Okta has an active Network Zone blocklist that can block anonymized proxy sources, either through gateway/proxy IP entries or an Enhanced Dynamic Zone anonymizer category.", + "Risk": "When anonymized proxy sources are not covered by active Okta Network Zone blocklists, attackers may hide their source network while attempting credential attacks or session establishment from untrusted infrastructure.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/networkzone", + "https://help.okta.com/en-us/content/topics/security/network/about-enhanced-dynamic-zones.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Networks: configure BlockedIpZone gateway/proxy IP entries or activate DefaultEnhancedDynamicZone / Enhanced Dynamic Zone blocklisting for anonymizers.", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review Okta Network Zones and configure an active IP blocklist or Enhanced Dynamic Zone blocklist that covers anonymizers before authentication.", + "Url": "https://hub.prowler.com/check/network_zone_block_anonymized_proxies" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py new file mode 100644 index 0000000000..eb607f5f67 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py @@ -0,0 +1,52 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.lib.service.scope import missing_scope_finding +from prowler.providers.okta.services.network.lib.network_zone_helpers import ( + compliant_anonymized_proxy_blocklist, +) +from prowler.providers.okta.services.network.network_zone_client import ( + network_zone_client, +) +from prowler.providers.okta.services.network.network_zone_service import ( + NetworkZoneSummary, +) + + +class network_zone_block_anonymized_proxies(Check): + """Ensure Okta actively blocks anonymized proxy sources before auth.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate whether an active blocklist covers anonymized proxies.""" + org_domain = network_zone_client.provider.identity.org_domain + if network_zone_client.missing_scopes: + return [ + missing_scope_finding( + metadata=self.metadata(), + org_domain=org_domain, + resource_id="okta-network-zones", + resource_name="Okta Network Zones", + missing_scopes=network_zone_client.missing_scopes, + action="evaluate Network Zone anonymized proxy blocklists", + ) + ] + + matching_zone, reason = compliant_anonymized_proxy_blocklist( + network_zone_client.network_zones + ) + + resource = matching_zone or NetworkZoneSummary() + report = CheckReportOkta( + metadata=self.metadata(), resource=resource, org_domain=org_domain + ) + if matching_zone: + report.status = "PASS" + report.status_extended = ( + f"Okta Network Zone '{matching_zone.name}' is an {reason}." + ) + else: + report.status = "FAIL" + report.status_extended = ( + "No active Okta Network Zone blocklist was found that blocks " + "anonymized proxies. Existing zones do not actively block gateway " + "or proxy IPs, nor an Enhanced Dynamic Zone anonymizer category." + ) + return [report] diff --git a/prowler/providers/okta/services/network/network_zone_client.py b/prowler/providers/okta/services/network/network_zone_client.py new file mode 100644 index 0000000000..6d4f603f80 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.network.network_zone_service import NetworkZone + +network_zone_client = NetworkZone(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/network/network_zone_service.py b/prowler/providers/okta/services/network/network_zone_service.py new file mode 100644 index 0000000000..26a7d7a743 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_service.py @@ -0,0 +1,152 @@ +from typing import Optional +from urllib.parse import parse_qs, urlparse + +from pydantic import BaseModel, Field + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.service import OktaService + +NETWORK_ZONES_READ_SCOPE = "okta.networkZones.read" + + +def _next_after_cursor(resp) -> Optional[str]: + """Extract the Okta pagination cursor from a Link header.""" + if resp is None: + return None + headers = getattr(resp, "headers", None) or {} + link = headers.get("link") or headers.get("Link") or "" + if not link: + return None + for part in link.split(","): + if 'rel="next"' not in part: + continue + url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">") + cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0] + if cursor: + return cursor + return None + + +def _normalise_sdk_result(result) -> tuple[list, object, object]: + """Return `(items, response, error)` for Okta SDK list call variants.""" + if isinstance(result, tuple): + err = result[-1] + items = result[0] or [] + resp = result[1] if len(result) >= 3 else None + return list(items), resp, err + return list(result or []), None, None + + +def _value(value) -> str: + """Return plain string values from Okta SDK enums and raw strings.""" + if value is None: + return "" + enum_value = getattr(value, "value", None) + if enum_value is not None: + return str(enum_value) + return str(value) + + +class NetworkZone(OktaService): + """Fetches Okta Network Zones for STIG network-zone checks.""" + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + self.missing_scopes: list[str] = self._missing_scopes( + [NETWORK_ZONES_READ_SCOPE] + ) + self.network_zones: dict[str, OktaNetworkZone] = self._list_network_zones() + + def _list_network_zones(self) -> dict[str, "OktaNetworkZone"]: + """List all Network Zones visible to the configured Okta service app.""" + if self.missing_scopes: + logger.warning( + "NetworkZone - Skipping Network Zones API call because required " + f"scope(s) are missing: {', '.join(self.missing_scopes)}" + ) + return {} + logger.info("NetworkZone - Listing Okta Network Zones...") + try: + return self._run(self._fetch_all()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_all(self) -> dict[str, "OktaNetworkZone"]: + result: dict[str, OktaNetworkZone] = {} + all_zones, err = await self._paginate( + lambda after: self.client.list_network_zones(after=after, limit=200) + ) + if err is not None: + logger.error(f"Error listing Network Zones: {err}") + return result + + for zone in all_zones: + zone_obj = self._build_zone(zone) + result[zone_obj.id] = zone_obj + return result + + @staticmethod + async def _paginate(fetch): + """Drain all pages of an SDK list call using Okta Link headers.""" + all_items = [] + result = await fetch(None) + items, resp, err = _normalise_sdk_result(result) + if err is not None: + return [], err + all_items.extend(items) + while True: + cursor = _next_after_cursor(resp) + if not cursor: + break + result = await fetch(cursor) + items, resp, err = _normalise_sdk_result(result) + if err is not None: + return all_items, err + all_items.extend(items) + return all_items, None + + @staticmethod + def _build_zone(zone) -> "OktaNetworkZone": + zone_id = _value(getattr(zone, "id", None)) + return OktaNetworkZone( + id=zone_id, + name=_value(getattr(zone, "name", None)) or zone_id, + status=_value(getattr(zone, "status", None)), + type=_value(getattr(zone, "type", None)), + usage=_value(getattr(zone, "usage", None)), + system=bool(getattr(zone, "system", False)), + gateways=list(getattr(zone, "gateways", None) or []), + proxies=list(getattr(zone, "proxies", None) or []), + asns=list(getattr(zone, "asns", None) or []), + locations=list(getattr(zone, "locations", None) or []), + ip_service_categories=[ + _value(category) + for category in (getattr(zone, "ip_service_categories", None) or []) + ], + ) + + +class OktaNetworkZone(BaseModel): + """Normalized Okta Network Zone attributes used by checks.""" + + id: str + name: str + status: str = "" + type: str = "" + usage: str = "" + system: bool = False + gateways: list[str] = Field(default_factory=list) + proxies: list[str] = Field(default_factory=list) + asns: list[str] = Field(default_factory=list) + locations: list[str] = Field(default_factory=list) + ip_service_categories: list[str] = Field(default_factory=list) + + +class NetworkZoneSummary(BaseModel): + """Synthetic resource for org-level Network Zone findings.""" + + id: str = "okta-network-zones" + name: str = "Okta Network Zones" diff --git a/tests/providers/okta/okta_fixtures.py b/tests/providers/okta/okta_fixtures.py index 2b7ca6927c..4af8145cd4 100644 --- a/tests/providers/okta/okta_fixtures.py +++ b/tests/providers/okta/okta_fixtures.py @@ -11,12 +11,24 @@ def set_mocked_okta_provider( session: OktaSession = None, identity: OktaIdentityInfo = None, audit_config: dict = None, + scopes: list[str] = None, ): if session is None: session = OktaSession( org_domain=OKTA_ORG_DOMAIN, client_id=OKTA_CLIENT_ID, - scopes=["okta.policies.read", "okta.brands.read"], + scopes=( + scopes + if scopes is not None + else [ + "okta.policies.read", + "okta.brands.read", + "okta.networkZones.read", + "okta.apiTokens.read", + "okta.roles.read", + "okta.authenticators.read", + ] + ), private_key=OKTA_PRIVATE_KEY, ) if identity is None: diff --git a/tests/providers/okta/services/api_token/api_token_fixtures.py b/tests/providers/okta/services/api_token/api_token_fixtures.py new file mode 100644 index 0000000000..2d2193e21c --- /dev/null +++ b/tests/providers/okta/services/api_token/api_token_fixtures.py @@ -0,0 +1,41 @@ +from unittest import mock + +from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_api_token_client( + tokens: dict = None, + known_network_zone_ids: set[str] = None, + missing_scopes: list[str] = None, +): + client = mock.MagicMock() + client.api_tokens = tokens or {} + client.known_network_zone_ids = known_network_zone_ids or {"nzo-corp"} + client.missing_scopes = missing_scopes or [] + client.provider = set_mocked_okta_provider() + return client + + +def api_token( + token_id: str = "00Tabcdefg1234567890", + name: str = "CI token", + *, + user_id: str = "00uabcdefg1234567890", + network_connection: str = "ZONE", + network_includes: list[str] = None, + network_excludes: list[str] = None, + owner_roles: list[str] = None, +): + return OktaApiToken( + id=token_id, + name=name, + client_name="Okta API", + user_id=user_id, + network_connection=network_connection, + network_includes=( + network_includes if network_includes is not None else ["nzo-corp"] + ), + network_excludes=network_excludes or [], + owner_roles=owner_roles or ["READ_ONLY_ADMIN"], + ) diff --git a/tests/providers/okta/services/api_token/api_token_service_test.py b/tests/providers/okta/services/api_token/api_token_service_test.py new file mode 100644 index 0000000000..ece1e6394c --- /dev/null +++ b/tests/providers/okta/services/api_token/api_token_service_test.py @@ -0,0 +1,248 @@ +from types import SimpleNamespace +from unittest import mock + +import pytest + +from prowler.providers.okta.okta_provider import DEFAULT_SCOPES +from prowler.providers.okta.services.apitoken.api_token_service import ( + API_TOKENS_READ_SCOPE, + NETWORK_ZONES_READ_SCOPE, + ROLES_READ_SCOPE, + ApiToken, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + return SimpleNamespace(headers=headers or {}) + + +def _sdk_token( + token_id: str = "00Tabcdefg1234567890", + name: str = "CI token", + *, + user_id: str = "00uabcdefg1234567890", + connection: str = "ZONE", + include: list[str] = None, + exclude: list[str] = None, +): + return SimpleNamespace( + id=token_id, + name=name, + client_name="Okta API", + user_id=user_id, + network=SimpleNamespace( + connection=connection, + include=include if include is not None else ["nzo-corp"], + exclude=exclude or [], + ), + ) + + +def _sdk_role(role_type: str): + return SimpleNamespace(type=role_type, label=role_type.replace("_", " ").title()) + + +def _sdk_zone(zone_id: str, name: str): + return SimpleNamespace(id=zone_id, name=name) + + +class Test_ApiToken_service: + def test_fetches_tokens_roles_and_known_network_zones(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(user_id): + assert user_id == token.user_id + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_list_network_zones(after=None, limit=None): + assert after is None + assert limit == 200 + return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + + service = ApiToken(provider) + + assert set(service.api_tokens.keys()) == {token.id} + assert service.api_tokens[token.id].network_connection == "ZONE" + assert service.api_tokens[token.id].owner_roles == ["READ_ONLY_ADMIN"] + assert service.known_network_zone_ids == {"nzo-corp", "Corporate"} + + def test_role_fetch_error_keeps_token_with_empty_roles(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(): + return ([token], _resp({}), None) + + async def fake_roles_error(user_id): + assert user_id == token.user_id + return ([], _resp({}), Exception("forbidden")) + + async def fake_list_network_zones(after=None, limit=None): + assert after is None + assert limit == 200 + return ([], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_roles_error + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == [] + + def test_paginates_known_network_zones_for_token_validation(self): + provider = set_mocked_okta_provider() + token = _sdk_token(include=["nzo-page-2"]) + next_link = '; rel="next"' + + async def fake_list_api_tokens(): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(user_id): + assert user_id == token.user_id + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_list_network_zones(after=None, limit=None): + assert limit == 200 + if after is None: + return ( + [_sdk_zone("nzo-page-1", "First")], + _resp({"link": next_link}), + None, + ) + return ([_sdk_zone("nzo-page-2", "Second")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.known_network_zone_ids == { + "nzo-page-1", + "First", + "nzo-page-2", + "Second", + } + + def test_returns_empty_on_token_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(): + return ([], _resp({}), Exception("forbidden")) + + async def fake_list_network_zones(after=None, limit=None): + assert after is None + assert limit == 200 + return ([], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = failing + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens == {} + + @pytest.mark.parametrize( + "missing_scope, expected_calls, expected_tokens, expected_zones, expected_roles", + [ + ( + API_TOKENS_READ_SCOPE, + [], + set(), + set(), + None, + ), + ( + NETWORK_ZONES_READ_SCOPE, + ["list_api_tokens", "list_assigned_roles_for_user"], + {"00Tabcdefg1234567890"}, + set(), + ["READ_ONLY_ADMIN"], + ), + ( + ROLES_READ_SCOPE, + ["list_network_zones", "list_api_tokens"], + {"00Tabcdefg1234567890"}, + {"nzo-corp", "Corporate"}, + [], + ), + ], + ) + def test_missing_scope_skips_corresponding_sdk_call( + self, + missing_scope, + expected_calls, + expected_tokens, + expected_zones, + expected_roles, + ): + provider = set_mocked_okta_provider( + scopes=[scope for scope in DEFAULT_SCOPES if scope != missing_scope] + ) + token = _sdk_token() + calls = [] + + async def fake_list_api_tokens(): + if missing_scope == API_TOKENS_READ_SCOPE: + raise AssertionError("list_api_tokens must not be called") + calls.append("list_api_tokens") + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(user_id): + if missing_scope == ROLES_READ_SCOPE: + raise AssertionError("list_assigned_roles_for_user must not be called") + assert user_id == token.user_id + calls.append("list_assigned_roles_for_user") + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_list_network_zones(after=None, limit=None): + if missing_scope in {API_TOKENS_READ_SCOPE, NETWORK_ZONES_READ_SCOPE}: + raise AssertionError("list_network_zones must not be called") + assert after is None + assert limit == 200 + calls.append("list_network_zones") + return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.missing_scopes == [missing_scope] + assert set(service.api_tokens.keys()) == expected_tokens + assert service.known_network_zone_ids == expected_zones + if expected_roles is not None: + assert service.api_tokens[token.id].owner_roles == expected_roles + assert calls == expected_calls diff --git a/tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py b/tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py new file mode 100644 index 0000000000..f9fa6e1e49 --- /dev/null +++ b/tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py @@ -0,0 +1,60 @@ +from unittest import mock + +import pytest + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.api_token.api_token_fixtures import ( + api_token, + build_api_token_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.apitoken." + "apitoken_not_super_admin.apitoken_not_super_admin.api_token_client" +) + + +def _run_check(api_token_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=api_token_client), + ): + from prowler.providers.okta.services.apitoken.apitoken_not_super_admin.apitoken_not_super_admin import ( + apitoken_not_super_admin, + ) + + return apitoken_not_super_admin().execute() + + +class Test_apitoken_not_super_admin: + @pytest.mark.parametrize( + "missing_scope", ["okta.apiTokens.read", "okta.roles.read"] + ) + def test_missing_required_scope_returns_manual(self, missing_scope): + findings = _run_check( + build_api_token_client({}, missing_scopes=[missing_scope]) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert missing_scope in findings[0].status_extended + + def test_no_tokens_returns_no_findings(self): + findings = _run_check(build_api_token_client({})) + assert findings == [] + + def test_token_owner_without_super_admin_passes(self): + token = api_token(owner_roles=["READ_ONLY_ADMIN"]) + findings = _run_check(build_api_token_client({token.id: token})) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == token.id + + def test_token_owner_with_super_admin_fails(self): + token = api_token(owner_roles=["SUPER_ADMIN"]) + findings = _run_check(build_api_token_client({token.id: token})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "Super Admin" in findings[0].status_extended diff --git a/tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py b/tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py new file mode 100644 index 0000000000..4e75b99e77 --- /dev/null +++ b/tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py @@ -0,0 +1,75 @@ +from unittest import mock + +import pytest + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.api_token.api_token_fixtures import ( + api_token, + build_api_token_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.apitoken." + "apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone.api_token_client" +) + + +def _run_check(api_token_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=api_token_client), + ): + from prowler.providers.okta.services.apitoken.apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone import ( + apitoken_restricted_to_network_zone, + ) + + return apitoken_restricted_to_network_zone().execute() + + +class Test_apitoken_restricted_to_network_zone: + @pytest.mark.parametrize( + "missing_scope", ["okta.apiTokens.read", "okta.networkZones.read"] + ) + def test_missing_required_scope_returns_manual(self, missing_scope): + findings = _run_check( + build_api_token_client({}, missing_scopes=[missing_scope]) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert missing_scope in findings[0].status_extended + + def test_no_tokens_returns_no_findings(self): + findings = _run_check(build_api_token_client({})) + assert findings == [] + + def test_token_restricted_to_known_network_zone_passes(self): + token = api_token(network_connection="ZONE", network_includes=["nzo-corp"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, known_network_zone_ids={"nzo-corp"} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == token.id + + def test_token_open_to_anywhere_fails(self): + token = api_token(network_connection="ANYWHERE", network_includes=[]) + findings = _run_check(build_api_token_client({token.id: token})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "from any IP" in findings[0].status_extended + + def test_token_restricted_to_unknown_zone_fails(self): + token = api_token(network_connection="ZONE", network_includes=["nzo-missing"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, known_network_zone_ids={"nzo-corp"} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "unknown Network Zone" in findings[0].status_extended diff --git a/tests/providers/okta/services/authenticator/authenticator_fixtures.py b/tests/providers/okta/services/authenticator/authenticator_fixtures.py new file mode 100644 index 0000000000..fcdae628a3 --- /dev/null +++ b/tests/providers/okta/services/authenticator/authenticator_fixtures.py @@ -0,0 +1,73 @@ +from unittest import mock + +from prowler.providers.okta.services.authenticator.authenticator_service import ( + OktaAuthenticator, + PasswordPolicy, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_authenticator_client( + password_policies: dict = None, + authenticators: dict = None, + missing_scopes: list[str] = None, +): + client = mock.MagicMock() + client.password_policies = password_policies or {} + client.authenticators = authenticators or {} + client.missing_scopes = missing_scopes or [] + client.provider = set_mocked_okta_provider() + return client + + +def password_policy( + policy_id: str = "pol-password", + name: str = "Default Password Policy", + *, + status: str = "ACTIVE", + priority: int = 1, + max_attempts: int = 3, + min_length: int = 15, + min_upper_case: int = 1, + min_lower_case: int = 1, + min_number: int = 1, + min_symbol: int = 1, + min_age_minutes: int = 1440, + max_age_days: int = 60, + history_count: int = 5, + common_password_check: bool = True, +): + return PasswordPolicy( + id=policy_id, + name=name, + status=status, + priority=priority, + max_attempts=max_attempts, + min_length=min_length, + min_upper_case=min_upper_case, + min_lower_case=min_lower_case, + min_number=min_number, + min_symbol=min_symbol, + min_age_minutes=min_age_minutes, + max_age_days=max_age_days, + history_count=history_count, + common_password_check=common_password_check, + ) + + +def authenticator( + auth_id: str = "aut-okta-verify", + key: str = "okta_verify", + name: str = "Okta Verify", + *, + status: str = "ACTIVE", + fips: str = "REQUIRED", +): + return OktaAuthenticator( + id=auth_id, + key=key, + name=name, + status=status, + type="app", + fips=fips, + ) diff --git a/tests/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant_test.py b/tests/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant_test.py new file mode 100644 index 0000000000..2a3423c4d4 --- /dev/null +++ b/tests/providers/okta/services/authenticator/authenticator_okta_verify_fips_compliant/authenticator_okta_verify_fips_compliant_test.py @@ -0,0 +1,64 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.authenticator.authenticator_fixtures import ( + authenticator, + build_authenticator_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.authenticator." + "authenticator_okta_verify_fips_compliant." + "authenticator_okta_verify_fips_compliant.authenticator_client" +) + + +def _run_check(authenticator_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=authenticator_client), + ): + from prowler.providers.okta.services.authenticator.authenticator_okta_verify_fips_compliant.authenticator_okta_verify_fips_compliant import ( + authenticator_okta_verify_fips_compliant, + ) + + return authenticator_okta_verify_fips_compliant().execute() + + +class Test_authenticator_okta_verify_fips_compliant: + def test_missing_authenticators_scope_returns_manual(self): + findings = _run_check( + build_authenticator_client( + authenticators={}, missing_scopes=["okta.authenticators.read"] + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.authenticators.read" in findings[0].status_extended + + def test_okta_verify_fips_required_passes(self): + okta_verify = authenticator(key="okta_verify", fips="REQUIRED") + findings = _run_check( + build_authenticator_client(authenticators={okta_verify.id: okta_verify}) + ) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == okta_verify.id + + def test_okta_verify_without_fips_required_fails(self): + okta_verify = authenticator(key="okta_verify", fips="OPTIONAL") + findings = _run_check( + build_authenticator_client(authenticators={okta_verify.id: okta_verify}) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "FIPS" in findings[0].status_extended + + def test_missing_okta_verify_fails(self): + findings = _run_check(build_authenticator_client(authenticators={})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "Okta Verify authenticator is not active" in findings[0].status_extended diff --git a/tests/providers/okta/services/authenticator/authenticator_password_policy_checks_test.py b/tests/providers/okta/services/authenticator/authenticator_password_policy_checks_test.py new file mode 100644 index 0000000000..3ab28a353e --- /dev/null +++ b/tests/providers/okta/services/authenticator/authenticator_password_policy_checks_test.py @@ -0,0 +1,212 @@ +from unittest import mock + +import pytest + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.authenticator.authenticator_fixtures import ( + build_authenticator_client, + password_policy, +) + +PASSWORD_POLICY_CHECK_CASES = [ + ( + "authenticator_password_common_password_check", + "common_password_check", + True, + False, + "common-password dictionary checks", + ), + ( + "authenticator_password_complexity_lowercase", + "min_lower_case", + 1, + 0, + "at least one lowercase character", + ), + ( + "authenticator_password_complexity_number", + "min_number", + 1, + 0, + "at least one numeric character", + ), + ( + "authenticator_password_complexity_symbol", + "min_symbol", + 1, + 0, + "at least one symbol character", + ), + ( + "authenticator_password_complexity_uppercase", + "min_upper_case", + 1, + 0, + "at least one uppercase character", + ), + ( + "authenticator_password_history_5", + "history_count", + 5, + 4, + "password history of at least 5 previous passwords", + ), + ( + "authenticator_password_lockout_threshold_3", + "max_attempts", + 3, + 4, + "password lockout after 3 or fewer failed attempts", + ), + ( + "authenticator_password_maximum_age_60d", + "max_age_days", + 60, + 61, + "maximum password age of 60 days or less", + ), + ( + "authenticator_password_minimum_age_24h", + "min_age_minutes", + 1440, + 1439, + "minimum password age of at least 24 hours", + ), + ( + "authenticator_password_minimum_length_15", + "min_length", + 15, + 14, + "minimum password length of at least 15 characters", + ), +] + + +def _run_password_policy_check(check_name: str, authenticator_client): + check_path = ( + f"prowler.providers.okta.services.authenticator.{check_name}." + f"{check_name}.authenticator_client" + ) + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(check_path, new=authenticator_client), + ): + module = __import__( + f"prowler.providers.okta.services.authenticator.{check_name}.{check_name}", + fromlist=[check_name], + ) + return getattr(module, check_name)().execute() + + +class Test_authenticator_password_policy_checks: + @pytest.mark.parametrize( + "check_name, field_name, compliant_value, non_compliant_value, expected_phrase", + PASSWORD_POLICY_CHECK_CASES, + ) + def test_missing_policies_scope_returns_manual( + self, + check_name, + field_name, + compliant_value, + non_compliant_value, + expected_phrase, + ): + findings = _run_password_policy_check( + check_name, + build_authenticator_client( + password_policies={}, missing_scopes=["okta.policies.read"] + ), + ) + + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.policies.read" in findings[0].status_extended + + @pytest.mark.parametrize( + "check_name, field_name, compliant_value, non_compliant_value, expected_phrase", + PASSWORD_POLICY_CHECK_CASES, + ) + def test_no_active_password_policies_fails( + self, + check_name, + field_name, + compliant_value, + non_compliant_value, + expected_phrase, + ): + findings = _run_password_policy_check( + check_name, build_authenticator_client(password_policies={}) + ) + + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "No active Okta Password Policies" in findings[0].status_extended + assert expected_phrase in findings[0].status_extended + + @pytest.mark.parametrize( + "check_name, field_name, compliant_value, non_compliant_value, expected_phrase", + PASSWORD_POLICY_CHECK_CASES, + ) + def test_compliant_password_policy_passes( + self, + check_name, + field_name, + compliant_value, + non_compliant_value, + expected_phrase, + ): + policy = password_policy(**{field_name: compliant_value}) + findings = _run_password_policy_check( + check_name, + build_authenticator_client(password_policies={policy.id: policy}), + ) + + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == policy.id + assert expected_phrase in findings[0].status_extended + + @pytest.mark.parametrize( + "check_name, field_name, compliant_value, non_compliant_value, expected_phrase", + PASSWORD_POLICY_CHECK_CASES, + ) + def test_non_compliant_password_policy_fails( + self, + check_name, + field_name, + compliant_value, + non_compliant_value, + expected_phrase, + ): + policy = password_policy(**{field_name: non_compliant_value}) + findings = _run_password_policy_check( + check_name, + build_authenticator_client(password_policies={policy.id: policy}), + ) + + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert findings[0].resource_id == policy.id + assert expected_phrase in findings[0].status_extended + + def test_multiple_active_password_policies_emit_one_finding_each(self): + check_name = "authenticator_password_minimum_length_15" + compliant = password_policy(policy_id="pol-good", name="Strict", min_length=15) + weak = password_policy( + policy_id="pol-weak", name="Weak", min_length=8, priority=2 + ) + + findings = _run_password_policy_check( + check_name, + build_authenticator_client( + password_policies={compliant.id: compliant, weak.id: weak} + ), + ) + + assert len(findings) == 2 + by_name = {finding.resource_name: finding for finding in findings} + assert by_name["Strict"].status == "PASS" + assert by_name["Weak"].status == "FAIL" diff --git a/tests/providers/okta/services/authenticator/authenticator_service_test.py b/tests/providers/okta/services/authenticator/authenticator_service_test.py new file mode 100644 index 0000000000..5f021687a7 --- /dev/null +++ b/tests/providers/okta/services/authenticator/authenticator_service_test.py @@ -0,0 +1,203 @@ +from types import SimpleNamespace +from unittest import mock + +import pytest + +from prowler.providers.okta.okta_provider import DEFAULT_SCOPES +from prowler.providers.okta.services.authenticator.authenticator_service import ( + AUTHENTICATORS_READ_SCOPE, + POLICIES_READ_SCOPE, + Authenticator, + OktaAuthenticator, + PasswordPolicy, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + return SimpleNamespace(headers=headers or {}) + + +def _sdk_password_policy(policy_id: str = "pol-password", name: str = "Default"): + return SimpleNamespace( + id=policy_id, + name=name, + priority=1, + status="ACTIVE", + system=True, + settings=SimpleNamespace( + password=SimpleNamespace( + lockout=SimpleNamespace(max_attempts=3), + complexity=SimpleNamespace( + min_length=15, + min_upper_case=1, + min_lower_case=1, + min_number=1, + min_symbol=1, + dictionary=SimpleNamespace(common=True), + ), + age=SimpleNamespace( + min_age_minutes=1440, + max_age_days=60, + history_count=5, + ), + ) + ), + ) + + +def _sdk_authenticator( + auth_id: str = "aut-okta-verify", + key: str = "okta_verify", + status: str = "ACTIVE", + fips: str = "REQUIRED", +): + return SimpleNamespace( + id=auth_id, + key=key, + name="Okta Verify" if key == "okta_verify" else "Smart Card IdP", + status=status, + type="app", + settings=SimpleNamespace(compliance=SimpleNamespace(fips=fips)), + ) + + +class Test_Authenticator_service: + def test_fetches_password_policies_and_authenticators(self): + provider = set_mocked_okta_provider() + policy = _sdk_password_policy() + okta_verify = _sdk_authenticator() + + async def fake_list_policies(type, after=None): + assert type == "PASSWORD" + assert after is None + return ([policy], _resp({}), None) + + async def fake_list_authenticators(): + return ([okta_verify], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_policies = fake_list_policies + mocked.list_authenticators = fake_list_authenticators + mocked_client_cls.return_value = mocked + + service = Authenticator(provider) + + assert isinstance(service.password_policies[policy.id], PasswordPolicy) + assert service.password_policies[policy.id].min_length == 15 + assert isinstance(service.authenticators[okta_verify.id], OktaAuthenticator) + assert service.authenticators[okta_verify.id].fips == "REQUIRED" + + def test_returns_empty_collections_on_api_errors(self): + provider = set_mocked_okta_provider() + + async def failing_policies(type, after=None): + assert type == "PASSWORD" + assert after is None + return ([], _resp({}), Exception("forbidden")) + + async def failing_authenticators(): + return ([], _resp({}), Exception("forbidden")) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_policies = failing_policies + mocked.list_authenticators = failing_authenticators + mocked_client_cls.return_value = mocked + service = Authenticator(provider) + + assert service.password_policies == {} + assert service.authenticators == {} + + def test_paginates_password_policies(self): + provider = set_mocked_okta_provider() + page_1 = _sdk_password_policy("pol-1", "First") + page_2 = _sdk_password_policy("pol-2", "Second") + next_link = '; rel="next"' + calls = [] + + async def fake_list_policies(type, after=None): + assert type == "PASSWORD" + calls.append(after) + if after is None: + return ([page_1], _resp({"link": next_link}), None) + return ([page_2], _resp({}), None) + + async def fake_list_authenticators(): + return ([], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_policies = fake_list_policies + mocked.list_authenticators = fake_list_authenticators + mocked_client_cls.return_value = mocked + service = Authenticator(provider) + + assert calls == [None, "cursor-2"] + assert set(service.password_policies.keys()) == {"pol-1", "pol-2"} + + @pytest.mark.parametrize( + "missing_scope, expected_calls, expected_policies, expected_authenticators", + [ + ( + POLICIES_READ_SCOPE, + ["list_authenticators"], + set(), + {"aut-okta-verify"}, + ), + ( + AUTHENTICATORS_READ_SCOPE, + ["list_policies"], + {"pol-password"}, + set(), + ), + ], + ) + def test_missing_scope_skips_corresponding_sdk_call( + self, + missing_scope, + expected_calls, + expected_policies, + expected_authenticators, + ): + provider = set_mocked_okta_provider( + scopes=[scope for scope in DEFAULT_SCOPES if scope != missing_scope] + ) + policy = _sdk_password_policy() + okta_verify = _sdk_authenticator() + calls = [] + + async def fake_list_policies(type, after=None): + if missing_scope == POLICIES_READ_SCOPE: + raise AssertionError("list_policies must not be called") + assert type == "PASSWORD" + assert after is None + calls.append("list_policies") + return ([policy], _resp({}), None) + + async def fake_list_authenticators(): + if missing_scope == AUTHENTICATORS_READ_SCOPE: + raise AssertionError("list_authenticators must not be called") + calls.append("list_authenticators") + return ([okta_verify], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_policies = fake_list_policies + mocked.list_authenticators = fake_list_authenticators + mocked_client_cls.return_value = mocked + service = Authenticator(provider) + + assert service.missing_scopes == [missing_scope] + assert set(service.password_policies.keys()) == expected_policies + assert set(service.authenticators.keys()) == expected_authenticators + assert calls == expected_calls diff --git a/tests/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active_test.py b/tests/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active_test.py new file mode 100644 index 0000000000..4a291783a9 --- /dev/null +++ b/tests/providers/okta/services/authenticator/authenticator_smart_card_active/authenticator_smart_card_active_test.py @@ -0,0 +1,73 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.authenticator.authenticator_fixtures import ( + authenticator, + build_authenticator_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.authenticator." + "authenticator_smart_card_active.authenticator_smart_card_active.authenticator_client" +) + + +def _run_check(authenticator_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=authenticator_client), + ): + from prowler.providers.okta.services.authenticator.authenticator_smart_card_active.authenticator_smart_card_active import ( + authenticator_smart_card_active, + ) + + return authenticator_smart_card_active().execute() + + +class Test_authenticator_smart_card_active: + def test_missing_authenticators_scope_returns_manual(self): + findings = _run_check( + build_authenticator_client( + authenticators={}, missing_scopes=["okta.authenticators.read"] + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.authenticators.read" in findings[0].status_extended + + def test_smart_card_active_passes(self): + smart_card = authenticator( + auth_id="aut-smart-card", + key="smart_card_idp", + name="Smart Card IdP", + status="ACTIVE", + ) + findings = _run_check( + build_authenticator_client(authenticators={smart_card.id: smart_card}) + ) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == smart_card.id + + def test_missing_smart_card_fails(self): + findings = _run_check(build_authenticator_client(authenticators={})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "not active" in findings[0].status_extended + + def test_inactive_smart_card_fails(self): + smart_card = authenticator( + auth_id="aut-smart-card", + key="smart_card_idp", + name="Smart Card IdP", + status="INACTIVE", + ) + findings = _run_check( + build_authenticator_client(authenticators={smart_card.id: smart_card}) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "INACTIVE" in findings[0].status_extended diff --git a/tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py b/tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py new file mode 100644 index 0000000000..25e1655658 --- /dev/null +++ b/tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py @@ -0,0 +1,87 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.network_zone.network_zone_fixtures import ( + build_network_zone_client, + network_zone, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.network." + "network_zone_block_anonymized_proxies." + "network_zone_block_anonymized_proxies.network_zone_client" +) + + +def _run_check(network_zone_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=network_zone_client), + ): + from prowler.providers.okta.services.network.network_zone_block_anonymized_proxies.network_zone_block_anonymized_proxies import ( + network_zone_block_anonymized_proxies, + ) + + return network_zone_block_anonymized_proxies().execute() + + +class Test_network_zone_block_anonymized_proxies: + def test_missing_network_zone_scope_returns_manual(self): + findings = _run_check( + build_network_zone_client({}, missing_scopes=["okta.networkZones.read"]) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.networkZones.read" in findings[0].status_extended + + def test_no_zones_fails(self): + findings = _run_check(build_network_zone_client({})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "No active Okta Network Zone blocklist" in findings[0].status_extended + + def test_pass_with_active_ip_blocklist_gateway(self): + zone = network_zone(gateways=["198.51.100.10/32"]) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == zone.id + assert "gateway" in findings[0].status_extended + + def test_pass_with_active_enhanced_dynamic_anonymizer_blocklist(self): + zone = network_zone( + zone_id="nzo-enhanced", + name="DefaultEnhancedDynamicZone", + zone_type="DYNAMIC_V2", + system=True, + ip_service_categories=["ANONYMIZER"], + ) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert "Enhanced Dynamic" in findings[0].status_extended + + def test_existing_zones_without_anonymized_proxy_blocklist_fail(self): + policy_zone = network_zone( + zone_id="nzo-policy", + name="Corporate Policy Zone", + usage="POLICY", + gateways=["10.0.0.0/8"], + ) + inactive_blocklist = network_zone( + zone_id="nzo-inactive", + name="Inactive Blocklist", + status="INACTIVE", + gateways=["203.0.113.0/24"], + ) + findings = _run_check( + build_network_zone_client( + {policy_zone.id: policy_zone, inactive_blocklist.id: inactive_blocklist} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "do not actively block" in findings[0].status_extended diff --git a/tests/providers/okta/services/network_zone/network_zone_fixtures.py b/tests/providers/okta/services/network_zone/network_zone_fixtures.py new file mode 100644 index 0000000000..a20ce0b56d --- /dev/null +++ b/tests/providers/okta/services/network_zone/network_zone_fixtures.py @@ -0,0 +1,37 @@ +from unittest import mock + +from prowler.providers.okta.services.network.network_zone_service import OktaNetworkZone +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_network_zone_client(zones: dict = None, missing_scopes: list[str] = None): + client = mock.MagicMock() + client.network_zones = zones or {} + client.missing_scopes = missing_scopes or [] + client.provider = set_mocked_okta_provider() + return client + + +def network_zone( + zone_id: str = "nzo-1", + name: str = "BlockedIpZone", + *, + status: str = "ACTIVE", + zone_type: str = "IP", + usage: str = "BLOCKLIST", + system: bool = False, + gateways: list[str] = None, + proxies: list[str] = None, + ip_service_categories: list[str] = None, +): + return OktaNetworkZone( + id=zone_id, + name=name, + status=status, + type=zone_type, + usage=usage, + system=system, + gateways=gateways or [], + proxies=proxies or [], + ip_service_categories=ip_service_categories or [], + ) diff --git a/tests/providers/okta/services/network_zone/network_zone_service_test.py b/tests/providers/okta/services/network_zone/network_zone_service_test.py new file mode 100644 index 0000000000..838959a672 --- /dev/null +++ b/tests/providers/okta/services/network_zone/network_zone_service_test.py @@ -0,0 +1,153 @@ +from types import SimpleNamespace +from unittest import mock + +import pytest + +from prowler.providers.okta.services.network.network_zone_service import ( + NETWORK_ZONES_READ_SCOPE, + NetworkZone, + OktaNetworkZone, + _next_after_cursor, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + return SimpleNamespace(headers=headers or {}) + + +def _sdk_zone( + zone_id: str, + name: str, + *, + status: str = "ACTIVE", + zone_type: str = "IP", + usage: str = "BLOCKLIST", + system: bool = False, + gateways: list[str] = None, + proxies: list[str] = None, + ip_service_categories: list[str] = None, +): + return SimpleNamespace( + id=zone_id, + name=name, + status=status, + type=zone_type, + usage=usage, + system=system, + gateways=gateways or [], + proxies=proxies or [], + ip_service_categories=ip_service_categories or [], + ) + + +class Test_network_zone_pagination: + def test_no_link_header_returns_none(self): + assert _next_after_cursor(_resp({})) is None + + def test_extracts_next_after_cursor(self): + link = ( + '; rel="self", ' + '; rel="next"' + ) + assert _next_after_cursor(_resp({"Link": link})) == "next-page" + + +class Test_NetworkZone_service: + def test_fetches_ip_and_enhanced_dynamic_zones(self): + provider = set_mocked_okta_provider() + ip_zone = _sdk_zone( + "nzo-ip", + "Blocked IPs", + gateways=["203.0.113.10/32"], + ) + enhanced_zone = _sdk_zone( + "nzo-enhanced", + "DefaultEnhancedDynamicZone", + zone_type="DYNAMIC_V2", + system=True, + ip_service_categories=["ANONYMIZER"], + ) + + async def fake_list_network_zones(after=None, limit=None): + assert after is None + assert limit == 200 + return ([ip_zone, enhanced_zone], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + + service = NetworkZone(provider) + + assert set(service.network_zones.keys()) == {"nzo-ip", "nzo-enhanced"} + assert isinstance(service.network_zones["nzo-ip"], OktaNetworkZone) + assert service.network_zones["nzo-ip"].gateways == ["203.0.113.10/32"] + assert service.network_zones["nzo-enhanced"].type == "DYNAMIC_V2" + assert service.network_zones["nzo-enhanced"].ip_service_categories == [ + "ANONYMIZER" + ] + + def test_paginates_network_zones(self): + provider = set_mocked_okta_provider() + page_1 = _sdk_zone("nzo-1", "First") + page_2 = _sdk_zone("nzo-2", "Second") + next_link = '; rel="next"' + calls = [] + + async def fake_list_network_zones(after=None, limit=None): + assert limit == 200 + calls.append(after) + if after is None: + return ([page_1], _resp({"link": next_link}), None) + return ([page_2], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = NetworkZone(provider) + + assert calls == [None, "cursor-2"] + assert set(service.network_zones.keys()) == {"nzo-1", "nzo-2"} + + def test_returns_empty_on_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(after=None, limit=None): + assert after is None + assert limit == 200 + return ([], _resp({}), Exception("forbidden")) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = failing + mocked_client_cls.return_value = mocked + service = NetworkZone(provider) + + assert service.network_zones == {} + + @pytest.mark.parametrize("missing_scope", [NETWORK_ZONES_READ_SCOPE]) + def test_missing_scope_skips_network_zones_sdk_call(self, missing_scope): + provider = set_mocked_okta_provider(scopes=[]) + + async def forbidden_list_network_zones(after=None, limit=None): + raise AssertionError("list_network_zones must not be called") + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = forbidden_list_network_zones + mocked_client_cls.return_value = mocked + service = NetworkZone(provider) + + assert service.missing_scopes == [missing_scope] + assert service.network_zones == {}