feat(okta): add API token STIG checks (#11464)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
Hugo Pereira Brito
2026-06-08 17:11:54 +02:00
committed by GitHub
parent 7692a1d76a
commit 0ea2f6d67e
21 changed files with 1724 additions and 12 deletions
@@ -0,0 +1,147 @@
"""Tests for the shared Okta pagination helpers in
`prowler.providers.okta.lib.service.pagination`.
Covers `next_after_cursor` (extracts the `after` query param from an
RFC 5988 `Link: rel="next"` header) and `paginate` (drains all pages
of an SDK list call by following the cursor).
These tests were carved out of `network_zone_service_test.py` when its
local pagination helpers were replaced by the shared module — they now
cover code that six Okta services depend on.
"""
import asyncio
from types import SimpleNamespace
from prowler.providers.okta.lib.service.pagination import (
next_after_cursor,
paginate,
)
def _run(coro):
return asyncio.run(coro)
def _resp(headers: dict = None):
return SimpleNamespace(headers=headers or {})
class Test_next_after_cursor:
"""Behaviours previously covered in `network_zone_service_test.py`
under `Test_network_zone_pagination` — relocated here when the
local helper was replaced by the shared module.
"""
def test_returns_none_when_response_is_none(self):
assert next_after_cursor(None) is None
def test_returns_none_when_no_link_header(self):
assert next_after_cursor(_resp({})) is None
def test_extracts_next_after_cursor(self):
link = (
'<https://acme.okta.com/api/v1/zones?limit=20>; rel="self", '
'<https://acme.okta.com/api/v1/zones?after=next-page>; rel="next"'
)
assert next_after_cursor(_resp({"Link": link})) == "next-page"
def test_reads_lowercase_link_header(self):
# aiohttp's `CIMultiDict` is case-insensitive in practice, but
# callers occasionally pass a dict, so we check both spellings.
link = '<https://acme.okta.com/api/v1/zones?after=cursor-1>; rel="next"'
assert next_after_cursor(_resp({"link": link})) == "cursor-1"
def test_next_link_without_after_query_returns_none(self):
link = (
'<https://acme.okta.com/api/v1/zones?limit=20>; rel="self", '
'<https://acme.okta.com/api/v1/zones?limit=20>; rel="next"'
)
assert next_after_cursor(_resp({"Link": link})) is None
def test_no_next_segment_returns_none(self):
link = '<https://acme.okta.com/api/v1/zones?after=ignored>; rel="self"'
assert next_after_cursor(_resp({"Link": link})) is None
def test_url_decodes_after_cursor(self):
# `parse_qs` decodes percent-encoded values — opaque cursors with
# `=` or `+` must round-trip through callers that re-encode.
link = (
"<https://acme.okta.com/api/v1/zones?after=cursor%3Dabc%2B1>; " 'rel="next"'
)
assert next_after_cursor(_resp({"Link": link})) == "cursor=abc+1"
class Test_paginate:
def test_returns_items_for_single_page_response(self):
async def fetch(_after):
return (["a", "b"], _resp({}), None)
items, err = _run(paginate(fetch))
assert items == ["a", "b"]
assert err is None
def test_drains_multiple_pages(self):
link = '<https://acme.okta.com/api/v1/x?after=p2>; rel="next"'
seen_cursors: list = []
async def fetch(after):
seen_cursors.append(after)
if after is None:
return (["a"], _resp({"link": link}), None)
return (["b"], _resp({}), None)
items, err = _run(paginate(fetch))
assert items == ["a", "b"]
assert err is None
assert seen_cursors == [None, "p2"]
def test_returns_empty_when_first_page_is_empty(self):
async def fetch(_after):
return ([], _resp({}), None)
items, err = _run(paginate(fetch))
assert items == []
assert err is None
def test_returns_empty_and_error_when_first_page_fails(self):
async def fetch(_after):
return ([], _resp({}), Exception("forbidden"))
items, err = _run(paginate(fetch))
assert items == []
assert str(err) == "forbidden"
def test_returns_partial_items_when_subsequent_page_errors(self):
# Carved out of `network_zone_service_test.py`'s
# `test_pagination_returns_partial_items_when_second_page_errors`.
link = '<https://acme.okta.com/api/v1/x?after=p2>; rel="next"'
async def fetch(after):
if after is None:
return (["page-1"], _resp({"link": link}), None)
return ([], _resp({}), Exception("page failed"))
items, err = _run(paginate(fetch))
assert items == ["page-1"]
assert str(err) == "page failed"
def test_accepts_early_error_two_tuple_shape(self):
# The Okta SDK returns `(items, err)` on request-build failures
# (no response) and `(items, resp, err)` on transport responses.
# `paginate` reads `result[-1]` for err so the 2-tuple shape is
# handled — verify explicitly.
async def fetch(_after):
return ([], Exception("create failed"))
items, err = _run(paginate(fetch))
assert items == []
assert str(err) == "create failed"
def test_treats_none_items_as_empty_list(self):
async def fetch(_after):
return (None, _resp({}), None)
items, err = _run(paginate(fetch))
assert items == []
assert err is None
+6
View File
@@ -21,6 +21,9 @@ def set_mocked_okta_provider(
"okta.brands.read",
"okta.apps.read",
"okta.networkZones.read",
"okta.apiTokens.read",
"okta.roles.read",
"okta.groups.read",
"okta.logStreams.read",
"okta.idps.read",
],
@@ -35,6 +38,9 @@ def set_mocked_okta_provider(
"okta.brands.read",
"okta.apps.read",
"okta.networkZones.read",
"okta.apiTokens.read",
"okta.roles.read",
"okta.groups.read",
"okta.logStreams.read",
"okta.idps.read",
],
@@ -0,0 +1,46 @@
from unittest import mock
from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def build_api_token_client(
tokens: dict = None,
known_network_zone_ids: set[str] = None,
missing_scope: dict = None,
):
client = mock.MagicMock()
client.api_tokens = tokens or {}
client.known_network_zone_ids = known_network_zone_ids or {"nzo-corp"}
client.missing_scope = missing_scope or {
"api_tokens": None,
"network_zones": None,
"user_roles": None,
"user_groups": None,
}
client.provider = set_mocked_okta_provider()
return client
def api_token(
token_id: str = "00Tabcdefg1234567890",
name: str = "CI token",
*,
user_id: str = "00uabcdefg1234567890",
network_connection: str = "ZONE",
network_includes: list[str] = None,
network_excludes: list[str] = None,
owner_roles: list[str] = None,
):
return OktaApiToken(
id=token_id,
name=name,
client_name="Okta API",
user_id=user_id,
network_connection=network_connection,
network_includes=(
network_includes if network_includes is not None else ["nzo-corp"]
),
network_excludes=network_excludes or [],
owner_roles=owner_roles or ["READ_ONLY_ADMIN"],
)
@@ -0,0 +1,616 @@
import json
from types import SimpleNamespace
from unittest import mock
from prowler.providers.okta.models import OktaIdentityInfo
from prowler.providers.okta.services.apitoken.api_token_service import ApiToken
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
def _resp(headers: dict = None):
return SimpleNamespace(headers=headers or {})
def _sdk_token(
token_id: str = "00Tabcdefg1234567890",
name: str = "CI token",
*,
user_id: str = "00uabcdefg1234567890",
connection: str = "ZONE",
include: list[str] = None,
exclude: list[str] = None,
):
return SimpleNamespace(
id=token_id,
name=name,
client_name="Okta API",
user_id=user_id,
network=SimpleNamespace(
connection=connection,
include=include if include is not None else ["nzo-corp"],
exclude=exclude or [],
),
)
def _sdk_role(role_type: str):
return SimpleNamespace(type=role_type, label=role_type.replace("_", " ").title())
def _sdk_role_wrapped(role_type: str):
"""Mimic `ListGroupAssignedRoles200ResponseInner` — a oneOf wrapper
holding the real StandardRole on `.actual_instance`. The Okta SDK
actually returns this shape; treating it like the bare role yields
`type=None, label=None` and the role silently vanishes from the
check.
"""
inner = _sdk_role(role_type)
return SimpleNamespace(actual_instance=inner, type=None, label=None)
def _sdk_zone(zone_id: str, name: str):
return SimpleNamespace(id=zone_id, name=name)
def _sdk_group(group_id: str):
return SimpleNamespace(id=group_id)
async def _empty_list(*_a, **_k):
return ([], _resp({}), None)
class Test_ApiToken_service:
def test_fetches_tokens_roles_and_known_network_zones(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(user_id, *_a, **_k):
assert user_id == token.user_id
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_list_network_zones(*_a, **_k):
return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_user_groups = _empty_list
mocked.list_group_assigned_roles = _empty_list
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert set(service.api_tokens.keys()) == {token.id}
assert service.api_tokens[token.id].network_connection == "ZONE"
assert service.api_tokens[token.id].owner_roles == ["READ_ONLY_ADMIN"]
assert service.known_network_zone_ids == {"nzo-corp", "Corporate"}
def test_role_fetch_error_keeps_token_with_empty_roles(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_roles_error(*_a, **_k):
return ([], _resp({}), Exception("forbidden"))
async def fake_list_network_zones(*_a, **_k):
return ([], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_roles_error
mocked.list_user_groups = _empty_list
mocked.list_group_assigned_roles = _empty_list
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == []
def test_falls_back_to_raw_roles_when_sdk_role_is_empty(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(user_id, *_a, **_k):
assert user_id == token.user_id
return ([SimpleNamespace(type=None, label=None)], _resp({}), None)
async def fake_create_request(*_a, **_k):
return ("raw-role-request", None)
async def fake_execute(request, *_a, **_k):
assert request == "raw-role-request"
return (
_resp({}),
json.dumps(
[
{
"id": "ra-super-admin",
"type": "SUPER_ADMIN",
"label": "Super Administrator",
}
]
),
None,
)
async def fake_list_network_zones(*_a, **_k):
return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked._list_assigned_roles_for_user_serialize.return_value = (
"GET",
"/api/v1/users/00uabcdefg1234567890/roles",
{},
None,
None,
)
mocked._request_executor.create_request = fake_create_request
mocked._request_executor.execute = fake_execute
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == ["SUPER_ADMIN"]
def test_paginates_known_network_zones_for_token_validation(self):
provider = set_mocked_okta_provider()
token = _sdk_token(include=["nzo-page-2"])
next_link = '<https://acme.okta.com/api/v1/zones?after=cursor-2>; rel="next"'
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(*_a, **_k):
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_list_network_zones(*_a, **kwargs):
if kwargs.get("after") is None:
return (
[_sdk_zone("nzo-page-1", "First")],
_resp({"link": next_link}),
None,
)
return ([_sdk_zone("nzo-page-2", "Second")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_user_groups = _empty_list
mocked.list_group_assigned_roles = _empty_list
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.known_network_zone_ids == {
"nzo-page-1",
"First",
"nzo-page-2",
"Second",
}
def test_falls_back_to_raw_network_zones_when_sdk_listing_fails(self):
provider = set_mocked_okta_provider()
token = _sdk_token(include=["nzo-raw"])
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(*_a, **_k):
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_list_network_zones(*_a, **_k):
raise ValueError("EnhancedDynamicNetworkZone SDK deserialization failed")
async def fake_create_request(*_a, **_k):
return ("raw-zones-request", None)
async def fake_execute(request, *_a, **_k):
assert request == "raw-zones-request"
return (
_resp({}),
json.dumps([{"id": "nzo-raw", "name": "Raw Corporate"}]),
None,
)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_network_zones = fake_list_network_zones
mocked._list_network_zones_serialize.return_value = (
"GET",
"/api/v1/zones",
{},
None,
None,
)
mocked._request_executor.create_request = fake_create_request
mocked._request_executor.execute = fake_execute
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.known_network_zone_ids == {"nzo-raw", "Raw Corporate"}
def test_returns_empty_on_token_api_error(self):
provider = set_mocked_okta_provider()
async def failing(*_a, **_k):
return ([], _resp({}), Exception("forbidden"))
async def fake_list_network_zones(*_a, **_k):
return ([], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = failing
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens == {}
def test_missing_api_token_scope_skips_dependent_api_calls(self):
provider = set_mocked_okta_provider(
identity=OktaIdentityInfo(
org_domain="acme.okta.com",
client_id="0oa1234567890abcdef",
granted_scopes=["okta.networkZones.read", "okta.roles.read"],
)
)
async def fail_if_called(*_a, **_k):
raise AssertionError("API calls should not run without apiTokens scope")
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fail_if_called
mocked.list_network_zones = fail_if_called
mocked.list_assigned_roles_for_user = fail_if_called
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.missing_scope["api_tokens"] == "okta.apiTokens.read"
assert service.api_tokens == {}
assert service.known_network_zone_ids == set()
def test_missing_network_zone_scope_skips_zone_api_call(self):
provider = set_mocked_okta_provider(
identity=OktaIdentityInfo(
org_domain="acme.okta.com",
client_id="0oa1234567890abcdef",
granted_scopes=["okta.apiTokens.read", "okta.roles.read"],
)
)
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_list_assigned_roles_for_user(*_a, **_k):
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fail_if_called(*_a, **_k):
raise AssertionError("list_network_zones should not be called")
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user
mocked.list_network_zones = fail_if_called
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.missing_scope["network_zones"] == "okta.networkZones.read"
assert service.known_network_zone_ids == set()
assert set(service.api_tokens.keys()) == {token.id}
def test_missing_role_scope_skips_role_api_call(self):
provider = set_mocked_okta_provider(
identity=OktaIdentityInfo(
org_domain="acme.okta.com",
client_id="0oa1234567890abcdef",
granted_scopes=["okta.apiTokens.read", "okta.networkZones.read"],
)
)
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fail_if_called(*_a, **_k):
raise AssertionError("list_assigned_roles_for_user should not be called")
async def fake_list_network_zones(*_a, **_k):
return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fail_if_called
mocked.list_network_zones = fake_list_network_zones
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.missing_scope["user_roles"] == "okta.roles.read"
assert service.api_tokens[token.id].owner_roles == []
class Test_ApiToken_service_group_inherited_roles:
"""Verifies effective-role resolution combines direct + group-inherited.
Okta's `/api/v1/users/{userId}/roles` returns only directly-assigned
admin roles. Roles inherited via group membership — the common path
for Super Admin on trial tenants — are invisible to that endpoint.
The service must enumerate the user's groups and combine each
group's role assignments.
"""
def test_group_inherited_super_admin_surfaces(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_direct_roles(*_a, **_k):
return ([], _resp({}), None)
async def fake_user_groups(user_id, *_a, **_k):
assert user_id == token.user_id
return (
[_sdk_group("0gp-admins"), _sdk_group("0gp-eng")],
_resp({}),
None,
)
async def fake_group_roles(group_id, *_a, **_k):
if group_id == "0gp-admins":
return ([_sdk_role("SUPER_ADMIN")], _resp({}), None)
return ([], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_direct_roles
mocked.list_user_groups = fake_user_groups
mocked.list_group_assigned_roles = fake_group_roles
mocked.list_network_zones = _empty_list
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == ["SUPER_ADMIN"]
def test_direct_plus_group_roles_combined_and_deduped(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_direct_roles(*_a, **_k):
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fake_user_groups(*_a, **_k):
return ([_sdk_group("0gp-1")], _resp({}), None)
async def fake_group_roles(*_a, **_k):
# READ_ONLY_ADMIN already comes from the direct path; the
# dedupe should keep a single entry. SUPER_ADMIN is new.
return (
[_sdk_role("READ_ONLY_ADMIN"), _sdk_role("SUPER_ADMIN")],
_resp({}),
None,
)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_direct_roles
mocked.list_user_groups = fake_user_groups
mocked.list_group_assigned_roles = fake_group_roles
mocked.list_network_zones = _empty_list
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == [
"READ_ONLY_ADMIN",
"SUPER_ADMIN",
]
def test_role_resolution_cached_per_user_and_group(self):
provider = set_mocked_okta_provider()
token_a = _sdk_token(token_id="00Ttoken-a", user_id="00uowner-1")
token_b = _sdk_token(token_id="00Ttoken-b", user_id="00uowner-1")
token_c = _sdk_token(token_id="00Ttoken-c", user_id="00uowner-2")
direct_calls: list[str] = []
groups_calls: list[str] = []
group_role_calls: list[str] = []
async def fake_list_api_tokens(*_a, **_k):
return ([token_a, token_b, token_c], _resp({}), None)
async def fake_direct_roles(user_id, *_a, **_k):
direct_calls.append(user_id)
return ([], _resp({}), None)
async def fake_user_groups(user_id, *_a, **_k):
groups_calls.append(user_id)
return ([_sdk_group("0gp-shared")], _resp({}), None)
async def fake_group_roles(group_id, *_a, **_k):
group_role_calls.append(group_id)
return ([_sdk_role("HELP_DESK_ADMIN")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_direct_roles
mocked.list_user_groups = fake_user_groups
mocked.list_group_assigned_roles = fake_group_roles
mocked.list_network_zones = _empty_list
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
# Owner 00uowner-1 appears twice but is resolved once.
assert sorted(direct_calls) == ["00uowner-1", "00uowner-2"]
assert sorted(groups_calls) == ["00uowner-1", "00uowner-2"]
# Shared group resolved once even though both owners belong to it.
assert group_role_calls == ["0gp-shared"]
for token in (token_a, token_b, token_c):
assert service.api_tokens[token.id].owner_roles == ["HELP_DESK_ADMIN"]
def test_missing_groups_scope_falls_back_to_direct_only(self):
provider = set_mocked_okta_provider(
identity=OktaIdentityInfo(
org_domain="acme.okta.com",
client_id="0oa1234567890abcdef",
granted_scopes=[
"okta.apiTokens.read",
"okta.networkZones.read",
"okta.roles.read",
],
)
)
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_direct_roles(*_a, **_k):
return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None)
async def fail_if_called(*_a, **_k):
raise AssertionError(
"list_user_groups must not be called without okta.groups.read"
)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_direct_roles
mocked.list_user_groups = fail_if_called
mocked.list_group_assigned_roles = fail_if_called
mocked.list_network_zones = _empty_list
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.missing_scope["user_groups"] == "okta.groups.read"
assert service.api_tokens[token.id].owner_roles == ["READ_ONLY_ADMIN"]
def test_wrapped_oneof_role_shape_is_unwrapped(self):
"""Regression: the SDK returns each role as a oneOf wrapper with
the real StandardRole on `.actual_instance`. The previous
`_role_to_string` read `.type`/`.label` from the wrapper, got
None back, and produced an empty `owner_roles` — causing a
Super Admin token to silently PASS the check."""
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_direct_roles(*_a, **_k):
return ([_sdk_role_wrapped("SUPER_ADMIN")], _resp({}), None)
async def fake_user_groups(*_a, **_k):
return ([_sdk_group("0gp-extra")], _resp({}), None)
async def fake_group_roles(*_a, **_k):
return ([_sdk_role_wrapped("APP_ADMIN")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_direct_roles
mocked.list_user_groups = fake_user_groups
mocked.list_group_assigned_roles = fake_group_roles
mocked.list_network_zones = _empty_list
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == [
"SUPER_ADMIN",
"APP_ADMIN",
]
def test_group_role_fetch_failure_does_not_drop_other_groups(self):
provider = set_mocked_okta_provider()
token = _sdk_token()
async def fake_list_api_tokens(*_a, **_k):
return ([token], _resp({}), None)
async def fake_direct_roles(*_a, **_k):
return ([], _resp({}), None)
async def fake_user_groups(*_a, **_k):
return (
[_sdk_group("0gp-broken"), _sdk_group("0gp-good")],
_resp({}),
None,
)
async def fake_group_roles(group_id, *_a, **_k):
if group_id == "0gp-broken":
raise RuntimeError("upstream parse failure")
return ([_sdk_role("SUPER_ADMIN")], _resp({}), None)
with mock.patch(
"prowler.providers.okta.lib.service.service.OktaSDKClient"
) as mocked_client_cls:
mocked = mock.MagicMock()
mocked.list_api_tokens = fake_list_api_tokens
mocked.list_assigned_roles_for_user = fake_direct_roles
mocked.list_user_groups = fake_user_groups
mocked.list_group_assigned_roles = fake_group_roles
mocked.list_network_zones = _empty_list
mocked_client_cls.return_value = mocked
service = ApiToken(provider)
assert service.api_tokens[token.id].owner_roles == ["SUPER_ADMIN"]
@@ -0,0 +1,99 @@
from unittest import mock
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.api_token.api_token_fixtures import (
api_token,
build_api_token_client,
)
CHECK_PATH = (
"prowler.providers.okta.services.apitoken."
"apitoken_not_super_admin.apitoken_not_super_admin.api_token_client"
)
def _run_check(api_token_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=api_token_client),
):
from prowler.providers.okta.services.apitoken.apitoken_not_super_admin.apitoken_not_super_admin import (
apitoken_not_super_admin,
)
return apitoken_not_super_admin().execute()
class Test_apitoken_not_super_admin:
def test_no_tokens_returns_no_findings(self):
findings = _run_check(build_api_token_client({}))
assert findings == []
def test_missing_api_token_scope_is_manual(self):
findings = _run_check(
build_api_token_client(
{},
missing_scope={"api_tokens": "okta.apiTokens.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert "okta.apiTokens.read" in findings[0].status_extended
assert "okta.roles.read" in findings[0].status_extended
assert "okta.groups.read" in findings[0].status_extended
def test_missing_user_roles_scope_is_manual(self):
token = api_token(owner_roles=[])
findings = _run_check(
build_api_token_client(
{token.id: token},
missing_scope={"user_roles": "okta.roles.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert findings[0].resource_id == token.id
assert "okta.roles.read" in findings[0].status_extended
def test_token_owner_without_super_admin_passes(self):
token = api_token(owner_roles=["READ_ONLY_ADMIN"])
findings = _run_check(build_api_token_client({token.id: token}))
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == token.id
def test_token_owner_with_super_admin_fails(self):
token = api_token(owner_roles=["SUPER_ADMIN"])
findings = _run_check(build_api_token_client({token.id: token}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "Super Admin" in findings[0].status_extended
def test_missing_groups_scope_adds_best_effort_caveat_on_pass(self):
token = api_token(owner_roles=["READ_ONLY_ADMIN"])
findings = _run_check(
build_api_token_client(
{token.id: token},
missing_scope={"user_groups": "okta.groups.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "Group-inherited roles were not checked" in findings[0].status_extended
assert "okta.groups.read" in findings[0].status_extended
def test_missing_groups_scope_does_not_caveat_when_owner_is_super_admin(self):
token = api_token(owner_roles=["SUPER_ADMIN"])
findings = _run_check(
build_api_token_client(
{token.id: token},
missing_scope={"user_groups": "okta.groups.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "Super Admin" in findings[0].status_extended
assert "Group-inherited" not in findings[0].status_extended
@@ -0,0 +1,114 @@
from unittest import mock
from tests.providers.okta.okta_fixtures import set_mocked_okta_provider
from tests.providers.okta.services.api_token.api_token_fixtures import (
api_token,
build_api_token_client,
)
CHECK_PATH = (
"prowler.providers.okta.services.apitoken."
"apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone.api_token_client"
)
def _run_check(api_token_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_okta_provider(),
),
mock.patch(CHECK_PATH, new=api_token_client),
):
from prowler.providers.okta.services.apitoken.apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone import (
apitoken_restricted_to_network_zone,
)
return apitoken_restricted_to_network_zone().execute()
class Test_apitoken_restricted_to_network_zone:
def test_no_tokens_returns_no_findings(self):
findings = _run_check(build_api_token_client({}))
assert findings == []
def test_missing_api_token_scope_is_manual(self):
findings = _run_check(
build_api_token_client(
{},
missing_scope={"api_tokens": "okta.apiTokens.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert "okta.apiTokens.read" in findings[0].status_extended
assert "okta.networkZones.read" in findings[0].status_extended
def test_missing_network_zone_scope_is_manual(self):
token = api_token(network_connection="ZONE", network_includes=["nzo-corp"])
findings = _run_check(
build_api_token_client(
{token.id: token},
missing_scope={"network_zones": "okta.networkZones.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "MANUAL"
assert findings[0].resource_id == token.id
assert "okta.networkZones.read" in findings[0].status_extended
def test_missing_network_zone_scope_still_fails_anywhere_token(self):
token = api_token(network_connection="ANYWHERE", network_includes=[])
findings = _run_check(
build_api_token_client(
{token.id: token},
missing_scope={"network_zones": "okta.networkZones.read"},
)
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "from any IP" in findings[0].status_extended
def test_token_restricted_to_known_network_zone_passes(self):
token = api_token(network_connection="ZONE", network_includes=["nzo-corp"])
findings = _run_check(
build_api_token_client(
{token.id: token}, known_network_zone_ids={"nzo-corp"}
)
)
assert len(findings) == 1
assert findings[0].status == "PASS"
assert findings[0].resource_id == token.id
def test_token_with_only_excluded_network_zone_fails(self):
token = api_token(
network_connection="ZONE",
network_includes=[],
network_excludes=["nzo-blocked"],
)
findings = _run_check(
build_api_token_client(
{token.id: token}, known_network_zone_ids={"nzo-blocked"}
)
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "does not allowlist" in findings[0].status_extended
def test_token_open_to_anywhere_fails(self):
token = api_token(network_connection="ANYWHERE", network_includes=[])
findings = _run_check(build_api_token_client({token.id: token}))
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "from any IP" in findings[0].status_extended
def test_token_restricted_to_unknown_zone_fails(self):
token = api_token(network_connection="ZONE", network_includes=["nzo-missing"])
findings = _run_check(
build_api_token_client(
{token.id: token}, known_network_zone_ids={"nzo-corp"}
)
)
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "unknown Network Zone" in findings[0].status_extended