feat(sdk): add Okta STIG service checks

- Add Network Zone, API Token, and Authenticator checks

- Handle missing Okta scopes with MANUAL findings

- Add service and check tests with parametrized password policy coverage
This commit is contained in:
Hugo P.Brito
2026-06-04 16:09:02 +02:00
parent 91b2829c6f
commit c623441882
77 changed files with 3292 additions and 2 deletions
@@ -0,0 +1,38 @@
from pydantic import BaseModel
from prowler.lib.check.models import CheckReportOkta
class MissingScopeResource(BaseModel):
"""Synthetic resource used when a check cannot evaluate an Okta API."""
id: str
name: str
def missing_scope_finding(
*,
metadata,
org_domain: str,
resource_id: str,
resource_name: str,
missing_scopes: list[str],
action: str,
) -> CheckReportOkta:
"""Build a MANUAL finding for checks blocked by missing OAuth scopes."""
resource = MissingScopeResource(id=resource_id, name=resource_name)
report = CheckReportOkta(
metadata=metadata,
resource=resource,
org_domain=org_domain,
resource_id=resource.id,
resource_name=resource.name,
)
report.status = "MANUAL"
report.status_extended = (
f"Prowler could not {action} because the Okta service app is missing "
f"required OAuth scope(s): {', '.join(missing_scopes)}. Grant the "
"scope(s) to the service app and rerun the check, or review the "
"configuration manually in the Okta Admin Console."
)
return report
@@ -32,3 +32,8 @@ class OktaService:
def _run(coro):
"""Run an okta-sdk-python coroutine from synchronous code."""
return asyncio.run(coro)
def _missing_scopes(self, required_scopes: list[str]) -> list[str]:
"""Return required OAuth scopes not granted to the Okta service app."""
granted_scopes = set(getattr(self.provider.session, "scopes", []) or [])
return [scope for scope in required_scopes if scope not in granted_scopes]
+8 -1
View File
@@ -32,7 +32,14 @@ from prowler.providers.okta.exceptions.exceptions import (
from prowler.providers.okta.lib.mutelist.mutelist import OktaMutelist
from prowler.providers.okta.models import OktaIdentityInfo, OktaSession
DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read"]
DEFAULT_SCOPES = [
"okta.policies.read",
"okta.brands.read",
"okta.networkZones.read",
"okta.apiTokens.read",
"okta.roles.read",
"okta.authenticators.read",
]
# Accept only Okta-managed domains. Custom (vanity) domains are rejected on
# purpose — they're a recurring source of typos and silent misconfig and
# Prowler's audience overwhelmingly uses Okta-managed hosts. The TLDs below
@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.okta.services.apitoken.api_token_service import ApiToken
api_token_client = ApiToken(Provider.get_global_provider())
@@ -0,0 +1,201 @@
from typing import Optional
from urllib.parse import parse_qs, urlparse
from pydantic import BaseModel, Field
from prowler.lib.logger import logger
from prowler.providers.okta.lib.service.service import OktaService
API_TOKENS_READ_SCOPE = "okta.apiTokens.read"
NETWORK_ZONES_READ_SCOPE = "okta.networkZones.read"
ROLES_READ_SCOPE = "okta.roles.read"
def _next_after_cursor(resp) -> Optional[str]:
"""Extract the Okta pagination cursor from a Link header."""
if resp is None:
return None
headers = getattr(resp, "headers", None) or {}
link = headers.get("link") or headers.get("Link") or ""
if not link:
return None
for part in link.split(","):
if 'rel="next"' not in part:
continue
url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">")
cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0]
if cursor:
return cursor
return None
def _normalise_sdk_result(result) -> tuple[list, object, object]:
"""Return `(items, response, error)` for Okta SDK list call variants."""
if isinstance(result, tuple):
err = result[-1]
items = result[0] or []
resp = result[1] if len(result) >= 3 else None
return list(items), resp, err
return list(result or []), None, None
def _value(value) -> str:
"""Return plain string values from Okta SDK enums and raw strings."""
if value is None:
return ""
enum_value = getattr(value, "value", None)
if enum_value is not None:
return str(enum_value)
return str(value)
class ApiToken(OktaService):
"""Fetches Okta API token metadata, token owners' roles, and zones."""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.missing_scopes: list[str] = self._missing_scopes(
[API_TOKENS_READ_SCOPE, NETWORK_ZONES_READ_SCOPE, ROLES_READ_SCOPE]
)
self.known_network_zone_ids: set[str] = self._list_known_network_zone_ids()
self.api_tokens: dict[str, OktaApiToken] = self._list_api_tokens()
def _list_api_tokens(self) -> dict[str, "OktaApiToken"]:
"""List active API token metadata and owner roles."""
if API_TOKENS_READ_SCOPE in self.missing_scopes:
logger.warning(
"ApiToken - Skipping API Tokens API call because required "
f"scope is missing: {API_TOKENS_READ_SCOPE}"
)
return {}
logger.info("ApiToken - Listing Okta API tokens...")
try:
return self._run(self._fetch_api_tokens())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
async def _fetch_api_tokens(self) -> dict[str, "OktaApiToken"]:
result: dict[str, OktaApiToken] = {}
items, _resp, err = _normalise_sdk_result(await self.client.list_api_tokens())
if err is not None:
logger.error(f"Error listing API tokens: {err}")
return result
for token in items:
token_id = _value(getattr(token, "id", None))
user_id = _value(getattr(token, "user_id", None))
roles = await self._fetch_user_role_types(user_id) if user_id else []
network = getattr(token, "network", None)
token_obj = OktaApiToken(
id=token_id,
name=_value(getattr(token, "name", None)) or token_id,
client_name=_value(getattr(token, "client_name", None)),
user_id=user_id,
network_connection=_value(getattr(network, "connection", None)),
network_includes=list(getattr(network, "include", None) or []),
network_excludes=list(getattr(network, "exclude", None) or []),
owner_roles=roles,
)
result[token_obj.id] = token_obj
return result
async def _fetch_user_role_types(self, user_id: str) -> list[str]:
"""Return normalized admin role types assigned to the token owner."""
if ROLES_READ_SCOPE in self.missing_scopes:
logger.warning(
"ApiToken - Skipping assigned role lookup for token owner "
f"{user_id} because required scope is missing: {ROLES_READ_SCOPE}"
)
return []
items, _resp, err = _normalise_sdk_result(
await self.client.list_assigned_roles_for_user(user_id)
)
if err is not None:
logger.error(f"Error listing roles for token owner {user_id}: {err}")
return []
roles = []
for role in items:
role_type = _value(getattr(role, "type", None))
role_label = _value(getattr(role, "label", None))
roles.append(role_type or role_label)
return [role for role in roles if role]
def _list_known_network_zone_ids(self) -> set[str]:
"""List known Network Zone ids and names for token condition validation."""
if API_TOKENS_READ_SCOPE in self.missing_scopes:
logger.warning(
"ApiToken - Skipping Network Zones API call because API token "
f"listing is unavailable without {API_TOKENS_READ_SCOPE}."
)
return set()
if NETWORK_ZONES_READ_SCOPE in self.missing_scopes:
logger.warning(
"ApiToken - Skipping Network Zones API call because required "
f"scope is missing: {NETWORK_ZONES_READ_SCOPE}"
)
return set()
logger.info("ApiToken - Listing Network Zones for token restrictions...")
try:
return self._run(self._fetch_known_network_zone_ids())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return set()
async def _fetch_known_network_zone_ids(self) -> set[str]:
identifiers: set[str] = set()
items, err = await self._fetch_all_network_zones()
if err is not None:
logger.error(f"Error listing Network Zones for API token checks: {err}")
return identifiers
for zone in items:
zone_id = _value(getattr(zone, "id", None))
zone_name = _value(getattr(zone, "name", None))
if zone_id:
identifiers.add(zone_id)
if zone_name:
identifiers.add(zone_name)
return identifiers
async def _fetch_all_network_zones(self) -> tuple[list, object]:
"""Drain all Network Zone pages for API token reference validation."""
all_items = []
result = await self.client.list_network_zones(after=None, limit=200)
items, resp, err = _normalise_sdk_result(result)
if err is not None:
return [], err
all_items.extend(items)
while True:
cursor = _next_after_cursor(resp)
if not cursor:
break
result = await self.client.list_network_zones(after=cursor, limit=200)
items, resp, err = _normalise_sdk_result(result)
if err is not None:
return all_items, err
all_items.extend(items)
return all_items, None
class OktaApiToken(BaseModel):
"""Normalized Okta API token metadata used by checks."""
id: str
name: str
client_name: str = ""
user_id: str = ""
network_connection: str = ""
network_includes: list[str] = Field(default_factory=list)
network_excludes: list[str] = Field(default_factory=list)
owner_roles: list[str] = Field(default_factory=list)
class ApiTokenSummary(BaseModel):
"""Synthetic resource for org-level API token findings."""
id: str = "okta-api-tokens"
name: str = "Okta API Tokens"
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "apitoken_not_super_admin",
"CheckTitle": "Okta API tokens are not owned by Super Admin users",
"CheckType": [],
"ServiceName": "apitoken",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether visible Okta API tokens are owned by users without the Super Admin role, because API tokens inherit the administrative permissions of their owner.",
"Risk": "When an API token is owned by a Super Admin, exposure of that token may grant broad organization administration capability to an attacker.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/guides/api-security/",
"https://developer.okta.com/docs/reference/api/roles/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Create a dedicated service account, assign only required admin roles, rotate the API token, and revoke Super Admin-owned tokens.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review API token owners, move tokens to dedicated service accounts with least-privilege admin roles, rotate affected tokens, and revoke Super Admin-owned tokens.",
"Url": "https://hub.prowler.com/check/apitoken_not_super_admin"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,60 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.lib.service.scope import missing_scope_finding
from prowler.providers.okta.services.apitoken.api_token_client import api_token_client
from prowler.providers.okta.services.apitoken.api_token_service import (
API_TOKENS_READ_SCOPE,
ROLES_READ_SCOPE,
)
from prowler.providers.okta.services.apitoken.lib.api_token_helpers import (
owner_has_super_admin,
)
class apitoken_not_super_admin(Check):
"""Ensure Okta API tokens are not owned by Super Admin users."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate every active API token owner's assigned admin roles."""
org_domain = api_token_client.provider.identity.org_domain
missing_scopes = [
scope
for scope in (API_TOKENS_READ_SCOPE, ROLES_READ_SCOPE)
if scope in api_token_client.missing_scopes
]
if missing_scopes:
return [
missing_scope_finding(
metadata=self.metadata(),
org_domain=org_domain,
resource_id="okta-api-tokens",
resource_name="Okta API Tokens",
missing_scopes=missing_scopes,
action="evaluate API token owner admin roles",
)
]
findings: list[CheckReportOkta] = []
for token in api_token_client.api_tokens.values():
report = CheckReportOkta(
metadata=self.metadata(), resource=token, org_domain=org_domain
)
if owner_has_super_admin(token):
report.status = "FAIL"
report.status_extended = (
f"API token '{token.name}' is owned by user '{token.user_id}' "
"with the Super Admin role. Use a dedicated service account "
"with least-privilege admin roles instead."
)
else:
roles = (
", ".join(token.owner_roles)
if token.owner_roles
else "no admin roles returned"
)
report.status = "PASS"
report.status_extended = (
f"API token '{token.name}' owner '{token.user_id}' is not "
f"assigned Super Admin ({roles})."
)
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "apitoken_restricted_to_network_zone",
"CheckTitle": "Okta API tokens are restricted to known Network Zones",
"CheckType": [],
"ServiceName": "apitoken",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether visible Okta API tokens are constrained to known Network Zones rather than being usable from Any IP.",
"Risk": "When an API token can be used from Any IP or references unknown Network Zones, an exposed token may be replayed from attacker-controlled infrastructure.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/reference/api/api-tokens/",
"https://help.okta.com/oie/en-us/content/topics/security/api.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > API > Tokens: edit each token Security section and select specific Network Zones instead of Any IP.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review each Okta API token and restrict it to one or more known IP-based Network Zones instead of Any IP.",
"Url": "https://hub.prowler.com/check/apitoken_restricted_to_network_zone"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,45 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.lib.service.scope import missing_scope_finding
from prowler.providers.okta.services.apitoken.api_token_client import api_token_client
from prowler.providers.okta.services.apitoken.api_token_service import (
API_TOKENS_READ_SCOPE,
NETWORK_ZONES_READ_SCOPE,
)
from prowler.providers.okta.services.apitoken.lib.api_token_helpers import (
network_zone_restriction_status,
)
class apitoken_restricted_to_network_zone(Check):
"""Ensure Okta API tokens are restricted to known Network Zones."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate every active API token's network condition."""
org_domain = api_token_client.provider.identity.org_domain
missing_scopes = [
scope
for scope in (API_TOKENS_READ_SCOPE, NETWORK_ZONES_READ_SCOPE)
if scope in api_token_client.missing_scopes
]
if missing_scopes:
return [
missing_scope_finding(
metadata=self.metadata(),
org_domain=org_domain,
resource_id="okta-api-tokens",
resource_name="Okta API Tokens",
missing_scopes=missing_scopes,
action="evaluate API token Network Zone restrictions",
)
]
findings: list[CheckReportOkta] = []
for token in api_token_client.api_tokens.values():
report = CheckReportOkta(
metadata=self.metadata(), resource=token, org_domain=org_domain
)
report.status, report.status_extended = network_zone_restriction_status(
token, api_token_client.known_network_zone_ids
)
findings.append(report)
return findings
@@ -0,0 +1,49 @@
from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken
ANYWHERE_CONNECTIONS = {"", "ANYWHERE", "ANY_IP"}
def network_zone_restriction_status(
token: OktaApiToken, known_network_zone_ids: set[str]
) -> tuple[str, str]:
"""Evaluate whether an API token is restricted to known Network Zones."""
connection = token.network_connection.upper()
if connection in ANYWHERE_CONNECTIONS:
return (
"FAIL",
f"API token '{token.name}' can be used from any IP address. "
"Restrict the token to one or more known Okta Network Zones.",
)
referenced_zones = token.network_includes + token.network_excludes
if not referenced_zones:
return (
"FAIL",
f"API token '{token.name}' is not open to Any IP, but it does not "
"reference a specific Okta Network Zone.",
)
unknown_zones = [
zone for zone in referenced_zones if zone not in known_network_zone_ids
]
if unknown_zones:
return (
"FAIL",
f"API token '{token.name}' references unknown Network Zone(s): "
f"{', '.join(unknown_zones)}.",
)
return (
"PASS",
f"API token '{token.name}' is restricted to known Okta Network Zone(s): "
f"{', '.join(referenced_zones)}.",
)
def owner_has_super_admin(token: OktaApiToken) -> bool:
"""Return True when any token owner role is Super Admin."""
for role in token.owner_roles:
normalized = role.strip().replace(" ", "_").upper()
if normalized in {"SUPER_ADMIN", "SUPER_ADMINISTRATOR"}:
return True
return False
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.okta.services.authenticator.authenticator_service import (
Authenticator,
)
authenticator_client = Authenticator(Provider.get_global_provider())
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_okta_verify_fips_compliant",
"CheckTitle": "Okta Verify requires FIPS-compliant devices",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether the Okta Verify authenticator requires FIPS-compliant devices for enrollment where FIPS-compliant authenticator usage is required.",
"Risk": "When Okta Verify enrollment allows non-FIPS devices, users may authenticate with devices that do not meet the expected cryptographic assurance level.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/en-us/content/topics/mobile/ov-admin-config.htm",
"https://support.okta.com/help/s/article/How-to-Enable-FIPS-Encryption-on-Okta-Verify"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authenticators > Okta Verify: set FIPS Compliance to users enrolling in Okta Verify can use FIPS compliant devices only.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review the Okta Verify authenticator settings and enable FIPS compliance so enrollment is limited to FIPS-compliant devices.",
"Url": "https://hub.prowler.com/check/authenticator_okta_verify_fips_compliant"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,60 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.lib.service.scope import missing_scope_finding
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.authenticator_service import (
AUTHENTICATORS_READ_SCOPE,
)
from prowler.providers.okta.services.authenticator.lib.authenticator_helpers import (
find_authenticator_by_key,
missing_authenticator_resource,
)
class authenticator_okta_verify_fips_compliant(Check):
"""Ensure Okta Verify restricts enrollment to FIPS-compliant devices."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate Okta Verify FIPS compliance settings."""
org_domain = authenticator_client.provider.identity.org_domain
if AUTHENTICATORS_READ_SCOPE in authenticator_client.missing_scopes:
return [
missing_scope_finding(
metadata=self.metadata(),
org_domain=org_domain,
resource_id="okta-authenticators",
resource_name="Okta Authenticators",
missing_scopes=[AUTHENTICATORS_READ_SCOPE],
action="evaluate Okta Verify FIPS compliance settings",
)
]
authenticator = find_authenticator_by_key(
authenticator_client.authenticators, "okta_verify"
)
resource = authenticator or missing_authenticator_resource(
"okta_verify", "Okta Verify authenticator"
)
report = CheckReportOkta(
metadata=self.metadata(), resource=resource, org_domain=org_domain
)
if not authenticator or authenticator.status.upper() != "ACTIVE":
report.status = "FAIL"
report.status_extended = (
"Okta Verify authenticator is not active or missing."
)
elif authenticator.fips.upper() == "REQUIRED":
report.status = "PASS"
report.status_extended = (
"Okta Verify authenticator requires FIPS-compliant devices "
"for enrollment."
)
else:
report.status = "FAIL"
report.status_extended = (
"Okta Verify authenticator is active but does not require "
f"FIPS-compliant devices for enrollment (current value: "
f"{authenticator.fips or 'unset'})."
)
return [report]
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_common_password_check",
"CheckTitle": "Okta password policies check passwords against common-password dictionaries",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces common-password dictionary checks.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce common-password dictionary checks.",
"Url": "https://hub.prowler.com/check/authenticator_password_common_password_check"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_common_password_check(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="common_password_check",
requirement="common-password dictionary checks",
compliant=lambda value: value is True,
actual_label="common password check enabled",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_complexity_lowercase",
"CheckTitle": "Okta password policies require lowercase characters",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one lowercase character.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce at least one lowercase character.",
"Url": "https://hub.prowler.com/check/authenticator_password_complexity_lowercase"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_complexity_lowercase(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="min_lower_case",
requirement="at least one lowercase character",
compliant=lambda value: value is not None and value >= 1,
actual_label="minimum lowercase characters",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_complexity_number",
"CheckTitle": "Okta password policies require numeric characters",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one numeric character.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce at least one numeric character.",
"Url": "https://hub.prowler.com/check/authenticator_password_complexity_number"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_complexity_number(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="min_number",
requirement="at least one numeric character",
compliant=lambda value: value is not None and value >= 1,
actual_label="minimum numeric characters",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_complexity_symbol",
"CheckTitle": "Okta password policies require symbol characters",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one symbol character.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce at least one symbol character.",
"Url": "https://hub.prowler.com/check/authenticator_password_complexity_symbol"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_complexity_symbol(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="min_symbol",
requirement="at least one symbol character",
compliant=lambda value: value is not None and value >= 1,
actual_label="minimum symbol characters",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_complexity_uppercase",
"CheckTitle": "Okta password policies require uppercase characters",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces at least one uppercase character.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce at least one uppercase character.",
"Url": "https://hub.prowler.com/check/authenticator_password_complexity_uppercase"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_complexity_uppercase(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="min_upper_case",
requirement="at least one uppercase character",
compliant=lambda value: value is not None and value >= 1,
actual_label="minimum uppercase characters",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_history_5",
"CheckTitle": "Okta password policies remember at least 5 previous passwords",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces password history of at least 5 previous passwords.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce password history of at least 5 previous passwords.",
"Url": "https://hub.prowler.com/check/authenticator_password_history_5"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_history_5(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="history_count",
requirement="password history of at least 5 previous passwords",
compliant=lambda value: value is not None and value >= 5,
actual_label="password history count",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_lockout_threshold_3",
"CheckTitle": "Okta password policies lock accounts after 3 or fewer failed attempts",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces password lockout after 3 or fewer failed attempts.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce password lockout after 3 or fewer failed attempts.",
"Url": "https://hub.prowler.com/check/authenticator_password_lockout_threshold_3"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_lockout_threshold_3(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="max_attempts",
requirement="password lockout after 3 or fewer failed attempts",
compliant=lambda value: value is not None and value <= 3,
actual_label="maximum failed attempts",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_maximum_age_60d",
"CheckTitle": "Okta password policies enforce a maximum password age of 60 days",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces maximum password age of 60 days or less.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce maximum password age of 60 days or less.",
"Url": "https://hub.prowler.com/check/authenticator_password_maximum_age_60d"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_maximum_age_60d(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="max_age_days",
requirement="maximum password age of 60 days or less",
compliant=lambda value: value is not None and 0 < value <= 60,
actual_label="maximum age days",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_minimum_age_24h",
"CheckTitle": "Okta password policies enforce a 24-hour minimum password age",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces minimum password age of at least 24 hours.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce minimum password age of at least 24 hours.",
"Url": "https://hub.prowler.com/check/authenticator_password_minimum_age_24h"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_minimum_age_24h(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="min_age_minutes",
requirement="minimum password age of at least 24 hours",
compliant=lambda value: value is not None and value >= 1440,
actual_label="minimum age minutes",
)
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "authenticator_password_minimum_length_15",
"CheckTitle": "Okta password policies require at least 15 characters",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether every visible active Okta Password Policy enforces minimum password length of at least 15 characters.",
"Risk": "When active password policies are weak or inconsistent, users governed by less strict policies may face increased credential guessing, password reuse, and account takeover risk.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/policy",
"https://help.okta.com/en-us/content/topics/security/policies/configure-password-policies.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authentication > Password: edit each active password policy and adjust password settings to the STIG requirement.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review every active Okta Password Policy and configure password settings to enforce minimum password length of at least 15 characters.",
"Url": "https://hub.prowler.com/check/authenticator_password_minimum_length_15"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.lib.password_policy_helpers import (
execute_password_policy_check,
)
class authenticator_password_minimum_length_15(Check):
"""Ensure Okta Password Policies enforce the required STIG setting."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate all active Okta Password Policies."""
return execute_password_policy_check(
metadata=self.metadata(),
org_domain=authenticator_client.provider.identity.org_domain,
password_policies=authenticator_client.password_policies,
missing_scopes=authenticator_client.missing_scopes,
field_name="min_length",
requirement="minimum password length of at least 15 characters",
compliant=lambda value: value is not None and value >= 15,
actual_label="minimum length",
)
@@ -0,0 +1,246 @@
from typing import Optional
from urllib.parse import parse_qs, urlparse
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.okta.lib.service.service import OktaService
AUTHENTICATORS_READ_SCOPE = "okta.authenticators.read"
POLICIES_READ_SCOPE = "okta.policies.read"
def _next_after_cursor(resp) -> Optional[str]:
"""Extract the Okta pagination cursor from a Link header."""
if resp is None:
return None
headers = getattr(resp, "headers", None) or {}
link = headers.get("link") or headers.get("Link") or ""
if not link:
return None
for part in link.split(","):
if 'rel="next"' not in part:
continue
url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">")
cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0]
if cursor:
return cursor
return None
def _normalise_sdk_result(result) -> tuple[list, object, object]:
"""Return `(items, response, error)` for Okta SDK list call variants."""
if isinstance(result, tuple):
err = result[-1]
items = result[0] or []
resp = result[1] if len(result) >= 3 else None
return list(items), resp, err
return list(result or []), None, None
def _value(value) -> str:
"""Return plain string values from Okta SDK enums and raw strings."""
if value is None:
return ""
enum_value = getattr(value, "value", None)
if enum_value is not None:
return str(enum_value)
return str(value)
def _int_or_none(value) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _bool_or_none(value) -> Optional[bool]:
if value is None:
return None
if isinstance(value, bool):
return value
return bool(value)
class Authenticator(OktaService):
"""Fetches Okta Password Policies and Authenticators for STIG checks."""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.missing_scopes: list[str] = self._missing_scopes(
[POLICIES_READ_SCOPE, AUTHENTICATORS_READ_SCOPE]
)
self.password_policies: dict[str, PasswordPolicy] = (
self._list_password_policies()
)
self.authenticators: dict[str, OktaAuthenticator] = self._list_authenticators()
def _list_password_policies(self) -> dict[str, "PasswordPolicy"]:
"""List PASSWORD policies with normalized password settings."""
if POLICIES_READ_SCOPE in self.missing_scopes:
logger.warning(
"Authenticator - Skipping Policies API call because required "
f"scope is missing: {POLICIES_READ_SCOPE}"
)
return {}
logger.info("Authenticator - Listing Okta PASSWORD policies...")
try:
return self._run(self._fetch_password_policies())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
async def _fetch_password_policies(self) -> dict[str, "PasswordPolicy"]:
result: dict[str, PasswordPolicy] = {}
items, err = await self._paginate(
lambda after: self.client.list_policies(type="PASSWORD", after=after)
)
if err is not None:
logger.error(f"Error listing PASSWORD policies: {err}")
return result
for policy in items:
policy_obj = self._build_password_policy(policy)
result[policy_obj.id] = policy_obj
return result
def _list_authenticators(self) -> dict[str, "OktaAuthenticator"]:
"""List org authenticators with normalized settings."""
if AUTHENTICATORS_READ_SCOPE in self.missing_scopes:
logger.warning(
"Authenticator - Skipping Authenticators API call because required "
f"scope is missing: {AUTHENTICATORS_READ_SCOPE}"
)
return {}
logger.info("Authenticator - Listing Okta authenticators...")
try:
return self._run(self._fetch_authenticators())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
async def _fetch_authenticators(self) -> dict[str, "OktaAuthenticator"]:
result: dict[str, OktaAuthenticator] = {}
items, _resp, err = _normalise_sdk_result(
await self.client.list_authenticators()
)
if err is not None:
logger.error(f"Error listing authenticators: {err}")
return result
for authenticator in items:
auth_obj = self._build_authenticator(authenticator)
result[auth_obj.id] = auth_obj
return result
@staticmethod
async def _paginate(fetch):
"""Drain all pages of an SDK list call using Okta Link headers."""
all_items = []
result = await fetch(None)
items, resp, err = _normalise_sdk_result(result)
if err is not None:
return [], err
all_items.extend(items)
while True:
cursor = _next_after_cursor(resp)
if not cursor:
break
result = await fetch(cursor)
items, resp, err = _normalise_sdk_result(result)
if err is not None:
return all_items, err
all_items.extend(items)
return all_items, None
@staticmethod
def _build_password_policy(policy) -> "PasswordPolicy":
settings = getattr(policy, "settings", None)
password_settings = getattr(settings, "password", None) if settings else None
lockout = (
getattr(password_settings, "lockout", None) if password_settings else None
)
complexity = (
getattr(password_settings, "complexity", None)
if password_settings
else None
)
dictionary = getattr(complexity, "dictionary", None) if complexity else None
age = getattr(password_settings, "age", None) if password_settings else None
policy_id = _value(getattr(policy, "id", None))
return PasswordPolicy(
id=policy_id,
name=_value(getattr(policy, "name", None)) or policy_id,
status=_value(getattr(policy, "status", None)),
priority=_int_or_none(getattr(policy, "priority", None)),
is_default=bool(getattr(policy, "system", False)),
max_attempts=_int_or_none(getattr(lockout, "max_attempts", None)),
min_length=_int_or_none(getattr(complexity, "min_length", None)),
min_upper_case=_int_or_none(getattr(complexity, "min_upper_case", None)),
min_lower_case=_int_or_none(getattr(complexity, "min_lower_case", None)),
min_number=_int_or_none(getattr(complexity, "min_number", None)),
min_symbol=_int_or_none(getattr(complexity, "min_symbol", None)),
min_age_minutes=_int_or_none(getattr(age, "min_age_minutes", None)),
max_age_days=_int_or_none(getattr(age, "max_age_days", None)),
history_count=_int_or_none(getattr(age, "history_count", None)),
common_password_check=_bool_or_none(getattr(dictionary, "common", None)),
)
@staticmethod
def _build_authenticator(authenticator) -> "OktaAuthenticator":
settings = getattr(authenticator, "settings", None)
compliance = getattr(settings, "compliance", None) if settings else None
auth_id = _value(getattr(authenticator, "id", None))
return OktaAuthenticator(
id=auth_id,
key=_value(getattr(authenticator, "key", None)),
name=_value(getattr(authenticator, "name", None)) or auth_id,
status=_value(getattr(authenticator, "status", None)),
type=_value(getattr(authenticator, "type", None)),
fips=_value(getattr(compliance, "fips", None)),
)
class PasswordPolicy(BaseModel):
"""Normalized Okta Password Policy settings used by checks."""
id: str
name: str
status: str = ""
priority: Optional[int] = None
is_default: bool = False
max_attempts: Optional[int] = None
min_length: Optional[int] = None
min_upper_case: Optional[int] = None
min_lower_case: Optional[int] = None
min_number: Optional[int] = None
min_symbol: Optional[int] = None
min_age_minutes: Optional[int] = None
max_age_days: Optional[int] = None
history_count: Optional[int] = None
common_password_check: Optional[bool] = None
class OktaAuthenticator(BaseModel):
"""Normalized Okta Authenticator settings used by checks."""
id: str
key: str
name: str
status: str = ""
type: str = ""
fips: str = ""
class AuthenticatorSummary(BaseModel):
"""Synthetic resource for org-level authenticator findings."""
id: str
name: str
@@ -0,0 +1,36 @@
{
"Provider": "okta",
"CheckID": "authenticator_smart_card_active",
"CheckTitle": "Okta Smart Card authenticator is active",
"CheckType": [],
"ServiceName": "authenticator",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Prowler evaluates whether the Smart Card IdP authenticator is configured and active for certificate-based authentication scenarios required by policy.",
"Risk": "When Smart Card authentication is unavailable, users may be unable to satisfy certificate-based authentication requirements where they are mandated.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/authenticator"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Authenticators: add or activate the Smart Card IdP authenticator.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review Okta Authenticators and configure or activate the Smart Card IdP authenticator where certificate-based authentication is required.",
"Url": "https://hub.prowler.com/check/authenticator_smart_card_active"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,56 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.lib.service.scope import missing_scope_finding
from prowler.providers.okta.services.authenticator.authenticator_client import (
authenticator_client,
)
from prowler.providers.okta.services.authenticator.authenticator_service import (
AUTHENTICATORS_READ_SCOPE,
)
from prowler.providers.okta.services.authenticator.lib.authenticator_helpers import (
find_authenticator_by_key,
missing_authenticator_resource,
)
class authenticator_smart_card_active(Check):
"""Ensure the Smart Card IdP authenticator is active."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate the Smart Card IdP authenticator status."""
org_domain = authenticator_client.provider.identity.org_domain
if AUTHENTICATORS_READ_SCOPE in authenticator_client.missing_scopes:
return [
missing_scope_finding(
metadata=self.metadata(),
org_domain=org_domain,
resource_id="okta-authenticators",
resource_name="Okta Authenticators",
missing_scopes=[AUTHENTICATORS_READ_SCOPE],
action="evaluate Smart Card IdP authenticator status",
)
]
authenticator = find_authenticator_by_key(
authenticator_client.authenticators, "smart_card_idp"
)
resource = authenticator or missing_authenticator_resource(
"smart_card_idp", "Smart Card IdP authenticator"
)
report = CheckReportOkta(
metadata=self.metadata(), resource=resource, org_domain=org_domain
)
if authenticator and authenticator.status.upper() == "ACTIVE":
report.status = "PASS"
report.status_extended = "Smart Card IdP authenticator is ACTIVE."
elif authenticator:
report.status = "FAIL"
report.status_extended = (
f"Smart Card IdP authenticator is not active; current status is "
f"{authenticator.status}."
)
else:
report.status = "FAIL"
report.status_extended = (
"Smart Card IdP authenticator is not active or missing."
)
return [report]
@@ -0,0 +1,19 @@
from prowler.providers.okta.services.authenticator.authenticator_service import (
AuthenticatorSummary,
OktaAuthenticator,
)
def find_authenticator_by_key(
authenticators: dict[str, OktaAuthenticator], key: str
) -> OktaAuthenticator | None:
"""Return the first authenticator with the requested key."""
for authenticator in authenticators.values():
if authenticator.key == key:
return authenticator
return None
def missing_authenticator_resource(key: str, name: str) -> AuthenticatorSummary:
"""Build a synthetic resource for a missing authenticator."""
return AuthenticatorSummary(id=f"{key}-missing", name=name)
@@ -0,0 +1,101 @@
from collections.abc import Callable
from prowler.lib.check.models import CheckReportOkta
from prowler.providers.okta.lib.service.scope import missing_scope_finding
from prowler.providers.okta.services.authenticator.authenticator_service import (
POLICIES_READ_SCOPE,
PasswordPolicy,
)
def active_password_policies(
password_policies: dict[str, PasswordPolicy],
) -> list[PasswordPolicy]:
"""Return active password policies sorted by priority."""
return sorted(
[
policy
for policy in password_policies.values()
if not policy.status or policy.status.upper() == "ACTIVE"
],
key=lambda policy: (
policy.priority if policy.priority is not None else float("inf"),
policy.name,
),
)
def password_policy_label(policy: PasswordPolicy) -> str:
kind = "default" if policy.is_default else "custom"
priority = policy.priority if policy.priority is not None else "unset"
return f"Password Policy '{policy.name}' (priority {priority}, {kind})"
def no_active_password_policies_finding(
metadata, org_domain: str, requirement: str
) -> CheckReportOkta:
"""Build the FAIL finding emitted when no active password policies exist."""
placeholder = PasswordPolicy(
id="password-policies-missing",
name="(no active password policies)",
status="MISSING",
)
report = CheckReportOkta(
metadata=metadata, resource=placeholder, org_domain=org_domain
)
report.status = "FAIL"
report.status_extended = (
"No active Okta Password Policies were returned by the API. "
f"The organization must enforce: {requirement}."
)
return report
def execute_password_policy_check(
*,
metadata,
org_domain: str,
password_policies: dict[str, PasswordPolicy],
missing_scopes: list[str],
field_name: str,
requirement: str,
compliant: Callable[[object], bool],
actual_label: str,
) -> list[CheckReportOkta]:
"""Evaluate a scalar password-policy setting across all active policies."""
if POLICIES_READ_SCOPE in missing_scopes:
return [
missing_scope_finding(
metadata=metadata,
org_domain=org_domain,
resource_id="okta-password-policies",
resource_name="Okta Password Policies",
missing_scopes=[POLICIES_READ_SCOPE],
action="evaluate active Password Policy settings",
)
]
policies = active_password_policies(password_policies)
if not policies:
return [no_active_password_policies_finding(metadata, org_domain, requirement)]
findings: list[CheckReportOkta] = []
for policy in policies:
actual = getattr(policy, field_name)
report = CheckReportOkta(
metadata=metadata, resource=policy, org_domain=org_domain
)
if compliant(actual):
report.status = "PASS"
report.status_extended = (
f"{password_policy_label(policy)} enforces {requirement} "
f"({actual_label}: {actual})."
)
else:
report.status = "FAIL"
report.status_extended = (
f"{password_policy_label(policy)} does not enforce {requirement} "
f"({actual_label}: {actual})."
)
findings.append(report)
return findings
@@ -0,0 +1,53 @@
from prowler.providers.okta.services.network.network_zone_service import OktaNetworkZone
ANONYMIZER_CATEGORY_MARKERS = (
"ANONYM",
"PROXY",
"TOR",
"VPN",
)
def active_blocklist_zones(
network_zones: dict[str, OktaNetworkZone],
) -> list[OktaNetworkZone]:
"""Return active Network Zones configured for blocklist usage."""
return sorted(
[
zone
for zone in network_zones.values()
if zone.status.upper() == "ACTIVE" and zone.usage.upper() == "BLOCKLIST"
],
key=lambda zone: (zone.name, zone.id),
)
def is_ip_blocklist_with_entries(zone: OktaNetworkZone) -> bool:
"""Return True when an IP blocklist zone contains gateway/proxy entries."""
return zone.type.upper() == "IP" and bool(zone.gateways or zone.proxies)
def is_enhanced_dynamic_anonymizer_blocklist(zone: OktaNetworkZone) -> bool:
"""Return True for active Enhanced Dynamic blocklists covering anonymizers."""
if zone.type.upper() != "DYNAMIC_V2":
return False
if zone.system and zone.name == "DefaultEnhancedDynamicZone":
return True
categories = [category.upper() for category in zone.ip_service_categories]
return any(
marker in category
for category in categories
for marker in ANONYMIZER_CATEGORY_MARKERS
)
def compliant_anonymized_proxy_blocklist(
network_zones: dict[str, OktaNetworkZone],
) -> tuple[OktaNetworkZone | None, str]:
"""Find the Network Zone that satisfies anonymized-proxy blocklisting."""
for zone in active_blocklist_zones(network_zones):
if is_ip_blocklist_with_entries(zone):
return zone, "active IP blocklist with gateway or proxy IP entries"
if is_enhanced_dynamic_anonymizer_blocklist(zone):
return zone, "active Enhanced Dynamic Zone blocklist for anonymizers"
return None, ""
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "network_zone_block_anonymized_proxies",
"CheckTitle": "Okta blocks anonymized proxy access with active Network Zone blocklists",
"CheckType": [],
"ServiceName": "network",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "Prowler evaluates whether Okta has an active Network Zone blocklist that can block anonymized proxy sources, either through gateway/proxy IP entries or an Enhanced Dynamic Zone anonymizer category.",
"Risk": "When anonymized proxy sources are not covered by active Okta Network Zone blocklists, attackers may hide their source network while attempting credential attacks or session establishment from untrusted infrastructure.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/api/openapi/okta-management/management/tags/networkzone",
"https://help.okta.com/en-us/content/topics/security/network/about-enhanced-dynamic-zones.htm"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Security > Networks: configure BlockedIpZone gateway/proxy IP entries or activate DefaultEnhancedDynamicZone / Enhanced Dynamic Zone blocklisting for anonymizers.",
"Terraform": ""
},
"Recommendation": {
"Text": "Review Okta Network Zones and configure an active IP blocklist or Enhanced Dynamic Zone blocklist that covers anonymizers before authentication.",
"Url": "https://hub.prowler.com/check/network_zone_block_anonymized_proxies"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,52 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.lib.service.scope import missing_scope_finding
from prowler.providers.okta.services.network.lib.network_zone_helpers import (
compliant_anonymized_proxy_blocklist,
)
from prowler.providers.okta.services.network.network_zone_client import (
network_zone_client,
)
from prowler.providers.okta.services.network.network_zone_service import (
NetworkZoneSummary,
)
class network_zone_block_anonymized_proxies(Check):
"""Ensure Okta actively blocks anonymized proxy sources before auth."""
def execute(self) -> list[CheckReportOkta]:
"""Evaluate whether an active blocklist covers anonymized proxies."""
org_domain = network_zone_client.provider.identity.org_domain
if network_zone_client.missing_scopes:
return [
missing_scope_finding(
metadata=self.metadata(),
org_domain=org_domain,
resource_id="okta-network-zones",
resource_name="Okta Network Zones",
missing_scopes=network_zone_client.missing_scopes,
action="evaluate Network Zone anonymized proxy blocklists",
)
]
matching_zone, reason = compliant_anonymized_proxy_blocklist(
network_zone_client.network_zones
)
resource = matching_zone or NetworkZoneSummary()
report = CheckReportOkta(
metadata=self.metadata(), resource=resource, org_domain=org_domain
)
if matching_zone:
report.status = "PASS"
report.status_extended = (
f"Okta Network Zone '{matching_zone.name}' is an {reason}."
)
else:
report.status = "FAIL"
report.status_extended = (
"No active Okta Network Zone blocklist was found that blocks "
"anonymized proxies. Existing zones do not actively block gateway "
"or proxy IPs, nor an Enhanced Dynamic Zone anonymizer category."
)
return [report]
@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.okta.services.network.network_zone_service import NetworkZone
network_zone_client = NetworkZone(Provider.get_global_provider())
@@ -0,0 +1,152 @@
from typing import Optional
from urllib.parse import parse_qs, urlparse
from pydantic import BaseModel, Field
from prowler.lib.logger import logger
from prowler.providers.okta.lib.service.service import OktaService
NETWORK_ZONES_READ_SCOPE = "okta.networkZones.read"
def _next_after_cursor(resp) -> Optional[str]:
"""Extract the Okta pagination cursor from a Link header."""
if resp is None:
return None
headers = getattr(resp, "headers", None) or {}
link = headers.get("link") or headers.get("Link") or ""
if not link:
return None
for part in link.split(","):
if 'rel="next"' not in part:
continue
url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">")
cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0]
if cursor:
return cursor
return None
def _normalise_sdk_result(result) -> tuple[list, object, object]:
"""Return `(items, response, error)` for Okta SDK list call variants."""
if isinstance(result, tuple):
err = result[-1]
items = result[0] or []
resp = result[1] if len(result) >= 3 else None
return list(items), resp, err
return list(result or []), None, None
def _value(value) -> str:
"""Return plain string values from Okta SDK enums and raw strings."""
if value is None:
return ""
enum_value = getattr(value, "value", None)
if enum_value is not None:
return str(enum_value)
return str(value)
class NetworkZone(OktaService):
"""Fetches Okta Network Zones for STIG network-zone checks."""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.missing_scopes: list[str] = self._missing_scopes(
[NETWORK_ZONES_READ_SCOPE]
)
self.network_zones: dict[str, OktaNetworkZone] = self._list_network_zones()
def _list_network_zones(self) -> dict[str, "OktaNetworkZone"]:
"""List all Network Zones visible to the configured Okta service app."""
if self.missing_scopes:
logger.warning(
"NetworkZone - Skipping Network Zones API call because required "
f"scope(s) are missing: {', '.join(self.missing_scopes)}"
)
return {}
logger.info("NetworkZone - Listing Okta Network Zones...")
try:
return self._run(self._fetch_all())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
async def _fetch_all(self) -> dict[str, "OktaNetworkZone"]:
result: dict[str, OktaNetworkZone] = {}
all_zones, err = await self._paginate(
lambda after: self.client.list_network_zones(after=after, limit=200)
)
if err is not None:
logger.error(f"Error listing Network Zones: {err}")
return result
for zone in all_zones:
zone_obj = self._build_zone(zone)
result[zone_obj.id] = zone_obj
return result
@staticmethod
async def _paginate(fetch):
"""Drain all pages of an SDK list call using Okta Link headers."""
all_items = []
result = await fetch(None)
items, resp, err = _normalise_sdk_result(result)
if err is not None:
return [], err
all_items.extend(items)
while True:
cursor = _next_after_cursor(resp)
if not cursor:
break
result = await fetch(cursor)
items, resp, err = _normalise_sdk_result(result)
if err is not None:
return all_items, err
all_items.extend(items)
return all_items, None
@staticmethod
def _build_zone(zone) -> "OktaNetworkZone":
zone_id = _value(getattr(zone, "id", None))
return OktaNetworkZone(
id=zone_id,
name=_value(getattr(zone, "name", None)) or zone_id,
status=_value(getattr(zone, "status", None)),
type=_value(getattr(zone, "type", None)),
usage=_value(getattr(zone, "usage", None)),
system=bool(getattr(zone, "system", False)),
gateways=list(getattr(zone, "gateways", None) or []),
proxies=list(getattr(zone, "proxies", None) or []),
asns=list(getattr(zone, "asns", None) or []),
locations=list(getattr(zone, "locations", None) or []),
ip_service_categories=[
_value(category)
for category in (getattr(zone, "ip_service_categories", None) or [])
],
)
class OktaNetworkZone(BaseModel):
"""Normalized Okta Network Zone attributes used by checks."""
id: str
name: str
status: str = ""
type: str = ""
usage: str = ""
system: bool = False
gateways: list[str] = Field(default_factory=list)
proxies: list[str] = Field(default_factory=list)
asns: list[str] = Field(default_factory=list)
locations: list[str] = Field(default_factory=list)
ip_service_categories: list[str] = Field(default_factory=list)
class NetworkZoneSummary(BaseModel):
"""Synthetic resource for org-level Network Zone findings."""
id: str = "okta-network-zones"
name: str = "Okta Network Zones"
+13 -1
View File
@@ -11,12 +11,24 @@ def set_mocked_okta_provider(
session: OktaSession = None,
identity: OktaIdentityInfo = None,
audit_config: dict = None,
scopes: list[str] = None,
):
if session is None:
session = OktaSession(
org_domain=OKTA_ORG_DOMAIN,
client_id=OKTA_CLIENT_ID,
scopes=["okta.policies.read", "okta.brands.read"],
scopes=(
scopes
if scopes is not None
else [
"okta.policies.read",
"okta.brands.read",
"okta.networkZones.read",
"okta.apiTokens.read",
"okta.roles.read",
"okta.authenticators.read",
]
),
private_key=OKTA_PRIVATE_KEY,
)
if identity is None:
@@ -0,0 +1,41 @@
from unittest import mock
from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def build_api_token_client(
tokens: dict = None,
known_network_zone_ids: set[str] = None,
missing_scopes: list[str] = None,
):
client = mock.MagicMock()
client.api_tokens = tokens or {}
client.known_network_zone_ids = known_network_zone_ids or {"nzo-corp"}
client.missing_scopes = missing_scopes or []
client.provider = set_mocked_okta_provider()
return client
def api_token(
token_id: str = "00Tabcdefg1234567890",
name: str = "CI token",
*,
user_id: str = "00uabcdefg1234567890",
network_connection: str = "ZONE",
network_includes: list[str] = None,
network_excludes: list[str] = None,
owner_roles: list[str] = None,
):
return OktaApiToken(
id=token_id,
name=name,
client_name="Okta API",
user_id=user_id,
network_connection=network_connection,
network_includes=(
network_includes if network_includes is not None else ["nzo-corp"]
),
network_excludes=network_excludes or [],
owner_roles=owner_roles or ["READ_ONLY_ADMIN"],
)
@@ -0,0 +1,248 @@
from types import SimpleNamespace
from unittest import mock
import pytest
from prowler.providers.okta.okta_provider import DEFAULT_SCOPES
from prowler.providers.okta.services.apitoken.api_token_service import (
API_TOKENS_READ_SCOPE,
NETWORK_ZONES_READ_SCOPE,
ROLES_READ_SCOPE,
ApiToken,
)
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def _resp(headers: dict = None):
return SimpleNamespace(headers=headers or {})
def _sdk_token(
token_id: str = "00Tabcdefg1234567890",
name: str = "CI token",
*,
user_id: str = "00uabcdefg1234567890",
connection: str = "ZONE",
include: list[str] = None,
exclude: list[str] = None,
):
return SimpleNamespace(
id=token_id,
name=name,
client_name="Okta API",
user_id=user_id,
network=SimpleNamespace(
connection=connection,
include=include if include is not None else ["nzo-corp"],
exclude=exclude or [],
),
)
def _sdk_role(role_type: str):
return SimpleNamespace(type=role_type, label=role_type.replace("_", " ").title())
def _sdk_zone(zone_id: str, name: str):
return SimpleNamespace(id=zone_id, name=name)
class Test_ApiToken_service:
def test_fetches_tokens_roles_and_known_network_zones(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens():
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(user_id):
assert user_id == token.user_id
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_list_network_zones(after=None, limit=None):
assert after is None
assert limit == 200
return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert set(service.api_tokens.keys()) == {token.id}
assert service.api_tokens[token.id].network_connection == "ZONE"
assert service.api_tokens[token.id].owner_roles == ["READ_ONLY_ADMIN"]
assert service.known_network_zone_ids == {"nzo-corp", "Corporate"}
def test_role_fetch_error_keeps_token_with_empty_roles(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens():
return ([token], _resp({}), None)
async def fake_roles_error(user_id):
assert user_id == token.user_id
return ([], _resp({}), Exception("forbidden"))
async def fake_list_network_zones(after=None, limit=None):
assert after is None
assert limit == 200
return ([], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_roles_error
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == []
def test_paginates_known_network_zones_for_token_validation(self):
provider = set_mocked_okta_provider()
token = _sdk_token(include=["nzo-page-2"])
next_link = '<https://acme.okta.com/api/v1/zones?after=cursor-2>; rel="next"'
async def fake_list_api_tokens():
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(user_id):
assert user_id == token.user_id
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_list_network_zones(after=None, limit=None):
assert limit == 200
if after is None:
return (
[_sdk_zone("nzo-page-1", "First")],
_resp({"link": next_link}),
None,
)
return ([_sdk_zone("nzo-page-2", "Second")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.known_network_zone_ids == {
"nzo-page-1",
"First",
"nzo-page-2",
"Second",
}
def test_returns_empty_on_token_api_error(self):
provider = set_mocked_okta_provider()
async def failing():
return ([], _resp({}), Exception("forbidden"))
async def fake_list_network_zones(after=None, limit=None):
assert after is None
assert limit == 200
return ([], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = failing
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens == {}
@pytest.mark.parametrize(
"missing_scope, expected_calls, expected_tokens, expected_zones, expected_roles",
[
(
API_TOKENS_READ_SCOPE,
[],
set(),
set(),
None,
),
(
NETWORK_ZONES_READ_SCOPE,
["list_api_tokens", "list_assigned_roles_for_user"],
{"00Tabcdefg1234567890"},
set(),
["READ_ONLY_ADMIN"],
),
(
ROLES_READ_SCOPE,
["list_network_zones", "list_api_tokens"],
{"00Tabcdefg1234567890"},
{"nzo-corp", "Corporate"},
[],
),
],
)
def test_missing_scope_skips_corresponding_sdk_call(
self,
missing_scope,
expected_calls,
expected_tokens,
expected_zones,
expected_roles,
):
provider = set_mocked_okta_provider(
scopes=[scope for scope in DEFAULT_SCOPES if scope != missing_scope]
)
token = _sdk_token()
calls = []
async def fake_list_api_tokens():
if missing_scope == API_TOKENS_READ_SCOPE:
raise AssertionError("list_api_tokens must not be called")
calls.append("list_api_tokens")
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(user_id):
if missing_scope == ROLES_READ_SCOPE:
raise AssertionError("list_assigned_roles_for_user must not be called")
assert user_id == token.user_id
calls.append("list_assigned_roles_for_user")
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_list_network_zones(after=None, limit=None):
if missing_scope in {API_TOKENS_READ_SCOPE, NETWORK_ZONES_READ_SCOPE}:
raise AssertionError("list_network_zones must not be called")
assert after is None
assert limit == 200
calls.append("list_network_zones")
return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.missing_scopes == [missing_scope]
assert set(service.api_tokens.keys()) == expected_tokens
assert service.known_network_zone_ids == expected_zones
if expected_roles is not None:
assert service.api_tokens[token.id].owner_roles == expected_roles
assert calls == expected_calls
@@ -0,0 +1,60 @@
from unittest import mock
import pytest
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.api_token.api_token_fixtures import (
api_token,
build_api_token_client,
)
CHECK_PATH = (
"prowler.providers.okta.services.apitoken."
"apitoken_not_super_admin.apitoken_not_super_admin.api_token_client"
)
def _run_check(api_token_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=api_token_client),
):
from prowler.providers.okta.services.apitoken.apitoken_not_super_admin.apitoken_not_super_admin import (
apitoken_not_super_admin,
)
return apitoken_not_super_admin().execute()
class Test_apitoken_not_super_admin:
@pytest.mark.parametrize(
"missing_scope", ["okta.apiTokens.read", "okta.roles.read"]
)
def test_missing_required_scope_returns_manual(self, missing_scope):
findings = _run_check(
build_api_token_client({}, missing_scopes=[missing_scope])
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert missing_scope in findings[0].status_extended
def test_no_tokens_returns_no_findings(self):
findings = _run_check(build_api_token_client({}))
assert findings == []
def test_token_owner_without_super_admin_passes(self):
token = api_token(owner_roles=["READ_ONLY_ADMIN"])
findings = _run_check(build_api_token_client({token.id: token}))
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == token.id
def test_token_owner_with_super_admin_fails(self):
token = api_token(owner_roles=["SUPER_ADMIN"])
findings = _run_check(build_api_token_client({token.id: token}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "Super Admin" in findings[0].status_extended
@@ -0,0 +1,75 @@
from unittest import mock
import pytest
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.api_token.api_token_fixtures import (
api_token,
build_api_token_client,
)
CHECK_PATH = (
"prowler.providers.okta.services.apitoken."
"apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone.api_token_client"
)
def _run_check(api_token_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=api_token_client),
):
from prowler.providers.okta.services.apitoken.apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone import (
apitoken_restricted_to_network_zone,
)
return apitoken_restricted_to_network_zone().execute()
class Test_apitoken_restricted_to_network_zone:
@pytest.mark.parametrize(
"missing_scope", ["okta.apiTokens.read", "okta.networkZones.read"]
)
def test_missing_required_scope_returns_manual(self, missing_scope):
findings = _run_check(
build_api_token_client({}, missing_scopes=[missing_scope])
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert missing_scope in findings[0].status_extended
def test_no_tokens_returns_no_findings(self):
findings = _run_check(build_api_token_client({}))
assert findings == []
def test_token_restricted_to_known_network_zone_passes(self):
token = api_token(network_connection="ZONE", network_includes=["nzo-corp"])
findings = _run_check(
build_api_token_client(
{token.id: token}, known_network_zone_ids={"nzo-corp"}
)
)
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == token.id
def test_token_open_to_anywhere_fails(self):
token = api_token(network_connection="ANYWHERE", network_includes=[])
findings = _run_check(build_api_token_client({token.id: token}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "from any IP" in findings[0].status_extended
def test_token_restricted_to_unknown_zone_fails(self):
token = api_token(network_connection="ZONE", network_includes=["nzo-missing"])
findings = _run_check(
build_api_token_client(
{token.id: token}, known_network_zone_ids={"nzo-corp"}
)
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "unknown Network Zone" in findings[0].status_extended
@@ -0,0 +1,73 @@
from unittest import mock
from prowler.providers.okta.services.authenticator.authenticator_service import (
OktaAuthenticator,
PasswordPolicy,
)
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def build_authenticator_client(
password_policies: dict = None,
authenticators: dict = None,
missing_scopes: list[str] = None,
):
client = mock.MagicMock()
client.password_policies = password_policies or {}
client.authenticators = authenticators or {}
client.missing_scopes = missing_scopes or []
client.provider = set_mocked_okta_provider()
return client
def password_policy(
policy_id: str = "pol-password",
name: str = "Default Password Policy",
*,
status: str = "ACTIVE",
priority: int = 1,
max_attempts: int = 3,
min_length: int = 15,
min_upper_case: int = 1,
min_lower_case: int = 1,
min_number: int = 1,
min_symbol: int = 1,
min_age_minutes: int = 1440,
max_age_days: int = 60,
history_count: int = 5,
common_password_check: bool = True,
):
return PasswordPolicy(
id=policy_id,
name=name,
status=status,
priority=priority,
max_attempts=max_attempts,
min_length=min_length,
min_upper_case=min_upper_case,
min_lower_case=min_lower_case,
min_number=min_number,
min_symbol=min_symbol,
min_age_minutes=min_age_minutes,
max_age_days=max_age_days,
history_count=history_count,
common_password_check=common_password_check,
)
def authenticator(
auth_id: str = "aut-okta-verify",
key: str = "okta_verify",
name: str = "Okta Verify",
*,
status: str = "ACTIVE",
fips: str = "REQUIRED",
):
return OktaAuthenticator(
id=auth_id,
key=key,
name=name,
status=status,
type="app",
fips=fips,
)
@@ -0,0 +1,64 @@
from unittest import mock
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.authenticator.authenticator_fixtures import (
authenticator,
build_authenticator_client,
)
CHECK_PATH = (
"prowler.providers.okta.services.authenticator."
"authenticator_okta_verify_fips_compliant."
"authenticator_okta_verify_fips_compliant.authenticator_client"
)
def _run_check(authenticator_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=authenticator_client),
):
from prowler.providers.okta.services.authenticator.authenticator_okta_verify_fips_compliant.authenticator_okta_verify_fips_compliant import (
authenticator_okta_verify_fips_compliant,
)
return authenticator_okta_verify_fips_compliant().execute()
class Test_authenticator_okta_verify_fips_compliant:
def test_missing_authenticators_scope_returns_manual(self):
findings = _run_check(
build_authenticator_client(
authenticators={}, missing_scopes=["okta.authenticators.read"]
)
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert "okta.authenticators.read" in findings[0].status_extended
def test_okta_verify_fips_required_passes(self):
okta_verify = authenticator(key="okta_verify", fips="REQUIRED")
findings = _run_check(
build_authenticator_client(authenticators={okta_verify.id: okta_verify})
)
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == okta_verify.id
def test_okta_verify_without_fips_required_fails(self):
okta_verify = authenticator(key="okta_verify", fips="OPTIONAL")
findings = _run_check(
build_authenticator_client(authenticators={okta_verify.id: okta_verify})
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "FIPS" in findings[0].status_extended
def test_missing_okta_verify_fails(self):
findings = _run_check(build_authenticator_client(authenticators={}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "Okta Verify authenticator is not active" in findings[0].status_extended
@@ -0,0 +1,212 @@
from unittest import mock
import pytest
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.authenticator.authenticator_fixtures import (
build_authenticator_client,
password_policy,
)
PASSWORD_POLICY_CHECK_CASES = [
(
"authenticator_password_common_password_check",
"common_password_check",
True,
False,
"common-password dictionary checks",
),
(
"authenticator_password_complexity_lowercase",
"min_lower_case",
1,
0,
"at least one lowercase character",
),
(
"authenticator_password_complexity_number",
"min_number",
1,
0,
"at least one numeric character",
),
(
"authenticator_password_complexity_symbol",
"min_symbol",
1,
0,
"at least one symbol character",
),
(
"authenticator_password_complexity_uppercase",
"min_upper_case",
1,
0,
"at least one uppercase character",
),
(
"authenticator_password_history_5",
"history_count",
5,
4,
"password history of at least 5 previous passwords",
),
(
"authenticator_password_lockout_threshold_3",
"max_attempts",
3,
4,
"password lockout after 3 or fewer failed attempts",
),
(
"authenticator_password_maximum_age_60d",
"max_age_days",
60,
61,
"maximum password age of 60 days or less",
),
(
"authenticator_password_minimum_age_24h",
"min_age_minutes",
1440,
1439,
"minimum password age of at least 24 hours",
),
(
"authenticator_password_minimum_length_15",
"min_length",
15,
14,
"minimum password length of at least 15 characters",
),
]
def _run_password_policy_check(check_name: str, authenticator_client):
check_path = (
f"prowler.providers.okta.services.authenticator.{check_name}."
f"{check_name}.authenticator_client"
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(check_path, new=authenticator_client),
):
module = __import__(
f"prowler.providers.okta.services.authenticator.{check_name}.{check_name}",
fromlist=[check_name],
)
return getattr(module, check_name)().execute()
class Test_authenticator_password_policy_checks:
@pytest.mark.parametrize(
"check_name, field_name, compliant_value, non_compliant_value, expected_phrase",
PASSWORD_POLICY_CHECK_CASES,
)
def test_missing_policies_scope_returns_manual(
self,
check_name,
field_name,
compliant_value,
non_compliant_value,
expected_phrase,
):
findings = _run_password_policy_check(
check_name,
build_authenticator_client(
password_policies={}, missing_scopes=["okta.policies.read"]
),
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert "okta.policies.read" in findings[0].status_extended
@pytest.mark.parametrize(
"check_name, field_name, compliant_value, non_compliant_value, expected_phrase",
PASSWORD_POLICY_CHECK_CASES,
)
def test_no_active_password_policies_fails(
self,
check_name,
field_name,
compliant_value,
non_compliant_value,
expected_phrase,
):
findings = _run_password_policy_check(
check_name, build_authenticator_client(password_policies={})
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "No active Okta Password Policies" in findings[0].status_extended
assert expected_phrase in findings[0].status_extended
@pytest.mark.parametrize(
"check_name, field_name, compliant_value, non_compliant_value, expected_phrase",
PASSWORD_POLICY_CHECK_CASES,
)
def test_compliant_password_policy_passes(
self,
check_name,
field_name,
compliant_value,
non_compliant_value,
expected_phrase,
):
policy = password_policy(**{field_name: compliant_value})
findings = _run_password_policy_check(
check_name,
build_authenticator_client(password_policies={policy.id: policy}),
)
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == policy.id
assert expected_phrase in findings[0].status_extended
@pytest.mark.parametrize(
"check_name, field_name, compliant_value, non_compliant_value, expected_phrase",
PASSWORD_POLICY_CHECK_CASES,
)
def test_non_compliant_password_policy_fails(
self,
check_name,
field_name,
compliant_value,
non_compliant_value,
expected_phrase,
):
policy = password_policy(**{field_name: non_compliant_value})
findings = _run_password_policy_check(
check_name,
build_authenticator_client(password_policies={policy.id: policy}),
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert findings[0].resource_id == policy.id
assert expected_phrase in findings[0].status_extended
def test_multiple_active_password_policies_emit_one_finding_each(self):
check_name = "authenticator_password_minimum_length_15"
compliant = password_policy(policy_id="pol-good", name="Strict", min_length=15)
weak = password_policy(
policy_id="pol-weak", name="Weak", min_length=8, priority=2
)
findings = _run_password_policy_check(
check_name,
build_authenticator_client(
password_policies={compliant.id: compliant, weak.id: weak}
),
)
assert len(findings) == 2
by_name = {finding.resource_name: finding for finding in findings}
assert by_name["Strict"].status == "PASS"
assert by_name["Weak"].status == "FAIL"
@@ -0,0 +1,203 @@
from types import SimpleNamespace
from unittest import mock
import pytest
from prowler.providers.okta.okta_provider import DEFAULT_SCOPES
from prowler.providers.okta.services.authenticator.authenticator_service import (
AUTHENTICATORS_READ_SCOPE,
POLICIES_READ_SCOPE,
Authenticator,
OktaAuthenticator,
PasswordPolicy,
)
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def _resp(headers: dict = None):
return SimpleNamespace(headers=headers or {})
def _sdk_password_policy(policy_id: str = "pol-password", name: str = "Default"):
return SimpleNamespace(
id=policy_id,
name=name,
priority=1,
status="ACTIVE",
system=True,
settings=SimpleNamespace(
password=SimpleNamespace(
lockout=SimpleNamespace(max_attempts=3),
complexity=SimpleNamespace(
min_length=15,
min_upper_case=1,
min_lower_case=1,
min_number=1,
min_symbol=1,
dictionary=SimpleNamespace(common=True),
),
age=SimpleNamespace(
min_age_minutes=1440,
max_age_days=60,
history_count=5,
),
)
),
)
def _sdk_authenticator(
auth_id: str = "aut-okta-verify",
key: str = "okta_verify",
status: str = "ACTIVE",
fips: str = "REQUIRED",
):
return SimpleNamespace(
id=auth_id,
key=key,
name="Okta Verify" if key == "okta_verify" else "Smart Card IdP",
status=status,
type="app",
settings=SimpleNamespace(compliance=SimpleNamespace(fips=fips)),
)
class Test_Authenticator_service:
def test_fetches_password_policies_and_authenticators(self):
provider = set_mocked_okta_provider()
policy = _sdk_password_policy()
okta_verify = _sdk_authenticator()
async def fake_list_policies(type, after=None):
assert type == "PASSWORD"
assert after is None
return ([policy], _resp({}), None)
async def fake_list_authenticators():
return ([okta_verify], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_policies = fake_list_policies
mocked.list_authenticators = fake_list_authenticators
mocked_client_cls.return_value = mocked
service = Authenticator(provider)
assert isinstance(service.password_policies[policy.id], PasswordPolicy)
assert service.password_policies[policy.id].min_length == 15
assert isinstance(service.authenticators[okta_verify.id], OktaAuthenticator)
assert service.authenticators[okta_verify.id].fips == "REQUIRED"
def test_returns_empty_collections_on_api_errors(self):
provider = set_mocked_okta_provider()
async def failing_policies(type, after=None):
assert type == "PASSWORD"
assert after is None
return ([], _resp({}), Exception("forbidden"))
async def failing_authenticators():
return ([], _resp({}), Exception("forbidden"))
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_policies = failing_policies
mocked.list_authenticators = failing_authenticators
mocked_client_cls.return_value = mocked
service = Authenticator(provider)
assert service.password_policies == {}
assert service.authenticators == {}
def test_paginates_password_policies(self):
provider = set_mocked_okta_provider()
page_1 = _sdk_password_policy("pol-1", "First")
page_2 = _sdk_password_policy("pol-2", "Second")
next_link = '<https://acme.okta.com/api/v1/policies?after=cursor-2>; rel="next"'
calls = []
async def fake_list_policies(type, after=None):
assert type == "PASSWORD"
calls.append(after)
if after is None:
return ([page_1], _resp({"link": next_link}), None)
return ([page_2], _resp({}), None)
async def fake_list_authenticators():
return ([], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_policies = fake_list_policies
mocked.list_authenticators = fake_list_authenticators
mocked_client_cls.return_value = mocked
service = Authenticator(provider)
assert calls == [None, "cursor-2"]
assert set(service.password_policies.keys()) == {"pol-1", "pol-2"}
@pytest.mark.parametrize(
"missing_scope, expected_calls, expected_policies, expected_authenticators",
[
(
POLICIES_READ_SCOPE,
["list_authenticators"],
set(),
{"aut-okta-verify"},
),
(
AUTHENTICATORS_READ_SCOPE,
["list_policies"],
{"pol-password"},
set(),
),
],
)
def test_missing_scope_skips_corresponding_sdk_call(
self,
missing_scope,
expected_calls,
expected_policies,
expected_authenticators,
):
provider = set_mocked_okta_provider(
scopes=[scope for scope in DEFAULT_SCOPES if scope != missing_scope]
)
policy = _sdk_password_policy()
okta_verify = _sdk_authenticator()
calls = []
async def fake_list_policies(type, after=None):
if missing_scope == POLICIES_READ_SCOPE:
raise AssertionError("list_policies must not be called")
assert type == "PASSWORD"
assert after is None
calls.append("list_policies")
return ([policy], _resp({}), None)
async def fake_list_authenticators():
if missing_scope == AUTHENTICATORS_READ_SCOPE:
raise AssertionError("list_authenticators must not be called")
calls.append("list_authenticators")
return ([okta_verify], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_policies = fake_list_policies
mocked.list_authenticators = fake_list_authenticators
mocked_client_cls.return_value = mocked
service = Authenticator(provider)
assert service.missing_scopes == [missing_scope]
assert set(service.password_policies.keys()) == expected_policies
assert set(service.authenticators.keys()) == expected_authenticators
assert calls == expected_calls
@@ -0,0 +1,73 @@
from unittest import mock
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.authenticator.authenticator_fixtures import (
authenticator,
build_authenticator_client,
)
CHECK_PATH = (
"prowler.providers.okta.services.authenticator."
"authenticator_smart_card_active.authenticator_smart_card_active.authenticator_client"
)
def _run_check(authenticator_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=authenticator_client),
):
from prowler.providers.okta.services.authenticator.authenticator_smart_card_active.authenticator_smart_card_active import (
authenticator_smart_card_active,
)
return authenticator_smart_card_active().execute()
class Test_authenticator_smart_card_active:
def test_missing_authenticators_scope_returns_manual(self):
findings = _run_check(
build_authenticator_client(
authenticators={}, missing_scopes=["okta.authenticators.read"]
)
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert "okta.authenticators.read" in findings[0].status_extended
def test_smart_card_active_passes(self):
smart_card = authenticator(
auth_id="aut-smart-card",
key="smart_card_idp",
name="Smart Card IdP",
status="ACTIVE",
)
findings = _run_check(
build_authenticator_client(authenticators={smart_card.id: smart_card})
)
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == smart_card.id
def test_missing_smart_card_fails(self):
findings = _run_check(build_authenticator_client(authenticators={}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not active" in findings[0].status_extended
def test_inactive_smart_card_fails(self):
smart_card = authenticator(
auth_id="aut-smart-card",
key="smart_card_idp",
name="Smart Card IdP",
status="INACTIVE",
)
findings = _run_check(
build_authenticator_client(authenticators={smart_card.id: smart_card})
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "INACTIVE" in findings[0].status_extended
@@ -0,0 +1,87 @@
from unittest import mock
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.network_zone.network_zone_fixtures import (
build_network_zone_client,
network_zone,
)
CHECK_PATH = (
"prowler.providers.okta.services.network."
"network_zone_block_anonymized_proxies."
"network_zone_block_anonymized_proxies.network_zone_client"
)
def _run_check(network_zone_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=network_zone_client),
):
from prowler.providers.okta.services.network.network_zone_block_anonymized_proxies.network_zone_block_anonymized_proxies import (
network_zone_block_anonymized_proxies,
)
return network_zone_block_anonymized_proxies().execute()
class Test_network_zone_block_anonymized_proxies:
def test_missing_network_zone_scope_returns_manual(self):
findings = _run_check(
build_network_zone_client({}, missing_scopes=["okta.networkZones.read"])
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert "okta.networkZones.read" in findings[0].status_extended
def test_no_zones_fails(self):
findings = _run_check(build_network_zone_client({}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "No active Okta Network Zone blocklist" in findings[0].status_extended
def test_pass_with_active_ip_blocklist_gateway(self):
zone = network_zone(gateways=["198.51.100.10/32"])
findings = _run_check(build_network_zone_client({zone.id: zone}))
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == zone.id
assert "gateway" in findings[0].status_extended
def test_pass_with_active_enhanced_dynamic_anonymizer_blocklist(self):
zone = network_zone(
zone_id="nzo-enhanced",
name="DefaultEnhancedDynamicZone",
zone_type="DYNAMIC_V2",
system=True,
ip_service_categories=["ANONYMIZER"],
)
findings = _run_check(build_network_zone_client({zone.id: zone}))
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "Enhanced Dynamic" in findings[0].status_extended
def test_existing_zones_without_anonymized_proxy_blocklist_fail(self):
policy_zone = network_zone(
zone_id="nzo-policy",
name="Corporate Policy Zone",
usage="POLICY",
gateways=["10.0.0.0/8"],
)
inactive_blocklist = network_zone(
zone_id="nzo-inactive",
name="Inactive Blocklist",
status="INACTIVE",
gateways=["203.0.113.0/24"],
)
findings = _run_check(
build_network_zone_client(
{policy_zone.id: policy_zone, inactive_blocklist.id: inactive_blocklist}
)
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "do not actively block" in findings[0].status_extended
@@ -0,0 +1,37 @@
from unittest import mock
from prowler.providers.okta.services.network.network_zone_service import OktaNetworkZone
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def build_network_zone_client(zones: dict = None, missing_scopes: list[str] = None):
client = mock.MagicMock()
client.network_zones = zones or {}
client.missing_scopes = missing_scopes or []
client.provider = set_mocked_okta_provider()
return client
def network_zone(
zone_id: str = "nzo-1",
name: str = "BlockedIpZone",
*,
status: str = "ACTIVE",
zone_type: str = "IP",
usage: str = "BLOCKLIST",
system: bool = False,
gateways: list[str] = None,
proxies: list[str] = None,
ip_service_categories: list[str] = None,
):
return OktaNetworkZone(
id=zone_id,
name=name,
status=status,
type=zone_type,
usage=usage,
system=system,
gateways=gateways or [],
proxies=proxies or [],
ip_service_categories=ip_service_categories or [],
)
@@ -0,0 +1,153 @@
from types import SimpleNamespace
from unittest import mock
import pytest
from prowler.providers.okta.services.network.network_zone_service import (
NETWORK_ZONES_READ_SCOPE,
NetworkZone,
OktaNetworkZone,
_next_after_cursor,
)
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def _resp(headers: dict = None):
return SimpleNamespace(headers=headers or {})
def _sdk_zone(
zone_id: str,
name: str,
*,
status: str = "ACTIVE",
zone_type: str = "IP",
usage: str = "BLOCKLIST",
system: bool = False,
gateways: list[str] = None,
proxies: list[str] = None,
ip_service_categories: list[str] = None,
):
return SimpleNamespace(
id=zone_id,
name=name,
status=status,
type=zone_type,
usage=usage,
system=system,
gateways=gateways or [],
proxies=proxies or [],
ip_service_categories=ip_service_categories or [],
)
class Test_network_zone_pagination:
def test_no_link_header_returns_none(self):
assert _next_after_cursor(_resp({})) is None
def test_extracts_next_after_cursor(self):
link = (
'<https://acme.okta.com/api/v1/zones?limit=20>; rel="self", '
'<https://acme.okta.com/api/v1/zones?after=next-page>; rel="next"'
)
assert _next_after_cursor(_resp({"Link": link})) == "next-page"
class Test_NetworkZone_service:
def test_fetches_ip_and_enhanced_dynamic_zones(self):
provider = set_mocked_okta_provider()
ip_zone = _sdk_zone(
"nzo-ip",
"Blocked IPs",
gateways=["203.0.113.10/32"],
)
enhanced_zone = _sdk_zone(
"nzo-enhanced",
"DefaultEnhancedDynamicZone",
zone_type="DYNAMIC_V2",
system=True,
ip_service_categories=["ANONYMIZER"],
)
async def fake_list_network_zones(after=None, limit=None):
assert after is None
assert limit == 200
return ([ip_zone, enhanced_zone], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = NetworkZone(provider)
assert set(service.network_zones.keys()) == {"nzo-ip", "nzo-enhanced"}
assert isinstance(service.network_zones["nzo-ip"], OktaNetworkZone)
assert service.network_zones["nzo-ip"].gateways == ["203.0.113.10/32"]
assert service.network_zones["nzo-enhanced"].type == "DYNAMIC_V2"
assert service.network_zones["nzo-enhanced"].ip_service_categories == [
"ANONYMIZER"
]
def test_paginates_network_zones(self):
provider = set_mocked_okta_provider()
page_1 = _sdk_zone("nzo-1", "First")
page_2 = _sdk_zone("nzo-2", "Second")
next_link = '<https://acme.okta.com/api/v1/zones?after=cursor-2>; rel="next"'
calls = []
async def fake_list_network_zones(after=None, limit=None):
assert limit == 200
calls.append(after)
if after is None:
return ([page_1], _resp({"link": next_link}), None)
return ([page_2], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = NetworkZone(provider)
assert calls == [None, "cursor-2"]
assert set(service.network_zones.keys()) == {"nzo-1", "nzo-2"}
def test_returns_empty_on_api_error(self):
provider = set_mocked_okta_provider()
async def failing(after=None, limit=None):
assert after is None
assert limit == 200
return ([], _resp({}), Exception("forbidden"))
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_network_zones = failing
mocked_client_cls.return_value = mocked
service = NetworkZone(provider)
assert service.network_zones == {}
@pytest.mark.parametrize("missing_scope", [NETWORK_ZONES_READ_SCOPE])
def test_missing_scope_skips_network_zones_sdk_call(self, missing_scope):
provider = set_mocked_okta_provider(scopes=[])
async def forbidden_list_network_zones(after=None, limit=None):
raise AssertionError("list_network_zones must not be called")
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_network_zones = forbidden_list_network_zones
mocked_client_cls.return_value = mocked
service = NetworkZone(provider)
assert service.missing_scopes == [missing_scope]
assert service.network_zones == {}