test(m365/entra): cover CA deleted-object-reference resolver and report-only message

Add service-layer tests for the directory-object resolver (404/Request_ResourceNotFound flags as deleted; 5xx/throttling and successful resolutions do not; roles resolve via the roleDefinitions endpoint; sentinels skipped and identifiers deduped across policies).

Surface report-only mode in the check's failure message, link the related directory-sync-account check via RelatedTo metadata, and hoist the shared sentinel set to a single CONDITIONAL_ACCESS_SENTINEL_IDS constant in entra_service.
This commit is contained in:
Daniel Barranquero
2026-06-17 14:19:13 +02:00
parent 83f8c0bfee
commit e29978bde6
5 changed files with 234 additions and 12 deletions
@@ -37,6 +37,8 @@
"e3"
],
"DependsOn": [],
"RelatedTo": [],
"RelatedTo": [
"entra_conditional_access_policy_directory_sync_account_excluded"
],
"Notes": "The check runs against all Conditional Access policies regardless of state (enabled, disabled, enabledForReportingButNotEnforced) — stale references in disabled policies are a misconfiguration that becomes live the moment the policy is re-enabled. Transient Graph errors (5xx, throttling) are not treated as deletions; only HTTP 404 responses flag an identifier as orphaned."
}
@@ -1,9 +1,9 @@
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.entra.entra_client import entra_client
# Sentinel identifiers used in conditions.users collections that do not
# correspond to real directory objects and must not be resolved against Graph.
_SENTINEL_IDS = {"All", "None", "GuestsOrExternalUsers"}
from prowler.providers.m365.services.entra.entra_service import (
CONDITIONAL_ACCESS_SENTINEL_IDS,
ConditionalAccessPolicyState,
)
class entra_conditional_access_policy_no_deleted_object_references(Check):
@@ -47,7 +47,7 @@ class entra_conditional_access_policy_no_deleted_object_references(Check):
else:
report.status = "FAIL"
report.status_extended = self._format_failure(
policy.display_name, orphans
policy.display_name, orphans, policy.state
)
findings.append(report)
@@ -80,14 +80,14 @@ class entra_conditional_access_policy_no_deleted_object_references(Check):
orphans = []
for type_, side, identifiers in collections:
for identifier in identifiers:
if identifier in _SENTINEL_IDS:
if identifier in CONDITIONAL_ACCESS_SENTINEL_IDS:
continue
if (type_, identifier) in unresolved:
orphans.append((type_, identifier, side))
return orphans
@staticmethod
def _format_failure(display_name, orphans):
def _format_failure(display_name, orphans, state=None):
# Group orphans by type for a readable, deterministic message.
by_type = {"user": [], "group": [], "role": []}
for type_, identifier, side in orphans:
@@ -99,7 +99,17 @@ class entra_conditional_access_policy_no_deleted_object_references(Check):
joined = ", ".join(sorted(by_type[type_]))
parts.append(f"{type_}s: {joined}")
# Surface report-only mode explicitly: the stale references are not yet
# enforced, but become live the moment the policy is turned on.
report_only = (
" The policy is in report-only mode, so these references are not "
"enforced yet but will take effect once it is enabled."
if state == ConditionalAccessPolicyState.ENABLED_FOR_REPORTING
else ""
)
return (
f"Conditional Access policy {display_name} references "
f"{len(orphans)} deleted directory object(s) — {'; '.join(parts)}."
f"{report_only}"
)
@@ -18,6 +18,12 @@ from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
# Sentinel identifiers used in Conditional Access ``conditions.users``
# collections that do not correspond to real directory objects and must not be
# resolved against Graph. Shared by the resolver below and the check that reads
# its result.
CONDITIONAL_ACCESS_SENTINEL_IDS = {"All", "None", "GuestsOrExternalUsers"}
class Entra(M365Service):
"""
@@ -1358,7 +1364,6 @@ OAuthAppInfo
"Access policies..."
)
sentinels = {"All", "None", "GuestsOrExternalUsers"}
ids_by_type: Dict[str, Set[str]] = {
"user": set(),
"group": set(),
@@ -1374,17 +1379,17 @@ OAuthAppInfo
for ident in (user_conditions.included_users or []) + (
user_conditions.excluded_users or []
):
if ident and ident not in sentinels:
if ident and ident not in CONDITIONAL_ACCESS_SENTINEL_IDS:
ids_by_type["user"].add(ident)
for ident in (user_conditions.included_groups or []) + (
user_conditions.excluded_groups or []
):
if ident and ident not in sentinels:
if ident and ident not in CONDITIONAL_ACCESS_SENTINEL_IDS:
ids_by_type["group"].add(ident)
for ident in (user_conditions.included_roles or []) + (
user_conditions.excluded_roles or []
):
if ident and ident not in sentinels:
if ident and ident not in CONDITIONAL_ACCESS_SENTINEL_IDS:
ids_by_type["role"].add(ident)
unresolved: Set[Tuple[str, str]] = set()
@@ -311,6 +311,39 @@ class Test_entra_conditional_access_policy_no_deleted_object_references:
assert "groups:" in result[0].status_extended
assert "roles:" in result[0].status_extended
def test_report_only_policy_failure_notes_mode(self):
"""A report-only policy with an orphan FAILs and flags the not-yet-enforced state."""
entra_client = _entra_client_mock()
deleted_user = str(uuid4())
policy_id, policy = _make_policy(
display_name="Report Only MFA",
state=ConditionalAccessPolicyState.ENABLED_FOR_REPORTING,
included_users=[deleted_user],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("user", deleted_user)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "report-only mode" in result[0].status_extended
def test_multiple_policies_mixed(self):
"""Two policies: one clean, one with an orphan. Distinct PASS/FAIL findings."""
entra_client = _entra_client_mock()
@@ -873,3 +873,175 @@ class Test_Entra_Service:
assert merged.password_credentials[0].key_id == "cred-app"
assert merged.password_credentials[0].display_name == "app-level-secret"
assert merged.password_credentials[0].is_active()
def test__resolve_identifiers_for_type_flags_only_404(self):
"""Only HTTP 404 / Request_ResourceNotFound mark an id as deleted.
Transient errors (5xx, throttling) and successful resolutions must
never be added to the unresolved set — that is the contract the check
relies on to avoid false positives during Graph outages.
"""
from msgraph.generated.models.o_data_errors.main_error import MainError
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
deleted_by_status = "deleted-status-404"
deleted_by_code = "deleted-code-rnf"
transient = "transient-503"
live = "live-user"
error_404 = ODataError()
error_404.response_status_code = 404
error_404.error = None # status code alone is enough
error_rnf = ODataError()
error_rnf.response_status_code = None
error_rnf.error = MainError()
error_rnf.error.code = "Request_ResourceNotFound"
error_503 = ODataError()
error_503.response_status_code = 503
error_503.error = MainError()
error_503.error.code = "ServiceUnavailable"
user_builders = {
deleted_by_status: SimpleNamespace(get=AsyncMock(side_effect=error_404)),
deleted_by_code: SimpleNamespace(get=AsyncMock(side_effect=error_rnf)),
transient: SimpleNamespace(get=AsyncMock(side_effect=error_503)),
live: SimpleNamespace(get=AsyncMock(return_value=SimpleNamespace(id=live))),
}
entra_service = Entra.__new__(Entra)
entra_service.client = SimpleNamespace(
users=SimpleNamespace(
by_user_id=MagicMock(side_effect=lambda uid: user_builders[uid])
)
)
unresolved = set()
asyncio.run(
entra_service._resolve_identifiers_for_type(
"user", set(user_builders), unresolved
)
)
assert unresolved == {
("user", deleted_by_status),
("user", deleted_by_code),
}
def test__resolve_identifiers_for_type_role_uses_role_definitions_endpoint(self):
"""A deleted role is resolved against the roleDefinitions endpoint."""
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
deleted_role = "deleted-role-id"
error_404 = ODataError()
error_404.response_status_code = 404
error_404.error = None
by_role_id = MagicMock(
return_value=SimpleNamespace(get=AsyncMock(side_effect=error_404))
)
entra_service = Entra.__new__(Entra)
entra_service.client = SimpleNamespace(
role_management=SimpleNamespace(
directory=SimpleNamespace(
role_definitions=SimpleNamespace(
by_unified_role_definition_id=by_role_id
)
)
)
)
unresolved = set()
asyncio.run(
entra_service._resolve_identifiers_for_type(
"role", {deleted_role}, unresolved
)
)
assert unresolved == {("role", deleted_role)}
by_role_id.assert_called_once_with(deleted_role)
def test__resolve_directory_object_references_skips_sentinels_and_dedups(self):
"""End-to-end resolver: sentinels are never queried, ids are deduped
across policies, and only deleted ids land in the unresolved set."""
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
deleted_user = "deleted-user-id"
live_user = "live-user-id"
deleted_group = "deleted-group-id"
def _user_conditions(**kwargs):
base = {
"included_users": [],
"excluded_users": [],
"included_groups": [],
"excluded_groups": [],
"included_roles": [],
"excluded_roles": [],
}
base.update(kwargs)
return SimpleNamespace(**base)
def _policy(user_conditions):
return SimpleNamespace(
conditions=SimpleNamespace(user_conditions=user_conditions)
)
policies = {
"policy-a": _policy(
_user_conditions(
included_users=["All", deleted_user, live_user],
excluded_groups=[deleted_group],
)
),
# Same deleted_user referenced again — must be resolved only once.
"policy-b": _policy(
_user_conditions(
included_users=[deleted_user],
excluded_users=["GuestsOrExternalUsers"],
)
),
# Policy without user conditions must be skipped without error.
"policy-c": SimpleNamespace(
conditions=SimpleNamespace(user_conditions=None)
),
}
error_404 = ODataError()
error_404.response_status_code = 404
error_404.error = None
user_builders = {
deleted_user: SimpleNamespace(get=AsyncMock(side_effect=error_404)),
live_user: SimpleNamespace(
get=AsyncMock(return_value=SimpleNamespace(id=live_user))
),
}
group_builders = {
deleted_group: SimpleNamespace(get=AsyncMock(side_effect=error_404)),
}
by_user_id = MagicMock(side_effect=lambda uid: user_builders[uid])
by_group_id = MagicMock(side_effect=lambda gid: group_builders[gid])
entra_service = Entra.__new__(Entra)
entra_service.client = SimpleNamespace(
users=SimpleNamespace(by_user_id=by_user_id),
groups=SimpleNamespace(by_group_id=by_group_id),
)
unresolved = asyncio.run(
entra_service._resolve_directory_object_references(policies)
)
assert unresolved == {
("user", deleted_user),
("group", deleted_group),
}
# Sentinels are filtered before any Graph call; only the two real user
# ids are queried, and the deduped deleted_user is queried exactly once.
queried_users = {call.args[0] for call in by_user_id.call_args_list}
assert queried_users == {deleted_user, live_user}
assert user_builders[deleted_user].get.await_count == 1