feat(m365/entra): add entra_conditional_access_policy_no_deleted_object_references check (#11236)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
Zeus Almightee
2026-06-17 10:16:49 -04:00
committed by GitHub
parent e2ce41a492
commit e8ffe59ce2
7 changed files with 1002 additions and 1 deletions
@@ -0,0 +1,466 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.m365.services.entra.entra_service import (
ApplicationsConditions,
ConditionalAccessPolicy,
ConditionalAccessPolicyState,
Conditions,
GrantControlOperator,
GrantControls,
PersistentBrowser,
SessionControls,
SignInFrequency,
UsersConditions,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
def _make_policy(
*,
display_name="Test Policy",
state=ConditionalAccessPolicyState.ENABLED,
included_users=None,
excluded_users=None,
included_groups=None,
excluded_groups=None,
included_roles=None,
excluded_roles=None,
):
"""Build a ConditionalAccessPolicy with the minimum fields required by the model."""
policy_id = str(uuid4())
policy = ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=[],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_users=included_users or [],
excluded_users=excluded_users or [],
included_groups=included_groups or [],
excluded_groups=excluded_groups or [],
included_roles=included_roles or [],
excluded_roles=excluded_roles or [],
),
client_app_types=[],
),
grant_controls=GrantControls(
built_in_controls=[],
operator=GrantControlOperator.OR,
authentication_strength=None,
),
session_controls=SessionControls(
persistent_browser=PersistentBrowser(is_enabled=False, mode=""),
sign_in_frequency=SignInFrequency(
is_enabled=False, frequency=None, type=None, interval=None
),
),
state=state,
)
return policy_id, policy
def _entra_client_mock():
client = mock.MagicMock()
client.audited_tenant = "audited_tenant"
client.audited_domain = DOMAIN
# Default to clean resolution; individual tests override as needed.
client.unresolved_directory_object_references = set()
client.errored_directory_object_references = set()
return client
CHECK_MODULE = (
"prowler.providers.m365.services.entra."
"entra_conditional_access_policy_no_deleted_object_references."
"entra_conditional_access_policy_no_deleted_object_references.entra_client"
)
class Test_entra_conditional_access_policy_no_deleted_object_references:
def test_no_policies(self):
"""No Conditional Access policies in tenant: no findings."""
entra_client = _entra_client_mock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {}
entra_client.unresolved_directory_object_references = set()
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 0
def test_sentinel_only_references_pass(self):
"""Policy with only sentinel values ('All', 'GuestsOrExternalUsers') passes."""
entra_client = _entra_client_mock()
policy_id, policy = _make_policy(
display_name="MFA For All",
included_users=["All"],
excluded_users=["GuestsOrExternalUsers"],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = set()
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
"references no deleted directory objects" in result[0].status_extended
)
assert result[0].resource_id == policy_id
assert result[0].resource_name == "MFA For All"
def test_all_references_resolve_pass(self):
"""Policy with real identifiers, none in the unresolved set: PASS."""
entra_client = _entra_client_mock()
live_user = str(uuid4())
live_group = str(uuid4())
policy_id, policy = _make_policy(
display_name="Targeted Policy",
included_users=[live_user],
included_groups=[live_group],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = set()
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
def test_deleted_user_in_include_fails(self):
"""Policy referencing a deleted user in includeUsers fails with type+side reported."""
entra_client = _entra_client_mock()
deleted_user = str(uuid4())
policy_id, policy = _make_policy(
display_name="Require MFA",
included_users=[deleted_user],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("user", deleted_user)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "1 deleted directory object(s)" in result[0].status_extended
assert "users:" in result[0].status_extended
assert deleted_user in result[0].status_extended
assert "(include)" in result[0].status_extended
def test_deleted_group_in_exclude_fails(self):
"""Policy referencing a deleted group in excludeGroups fails with exclude side reported."""
entra_client = _entra_client_mock()
deleted_group = str(uuid4())
policy_id, policy = _make_policy(
display_name="Block Legacy Auth",
included_users=["All"],
excluded_groups=[deleted_group],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("group", deleted_group)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "groups:" in result[0].status_extended
assert "(exclude)" in result[0].status_extended
def test_deleted_role_in_disabled_policy_still_fails(self):
"""Disabled policy with a stale role reference still FAILs (per spec)."""
entra_client = _entra_client_mock()
deleted_role = str(uuid4())
policy_id, policy = _make_policy(
display_name="Legacy Admin Policy",
state=ConditionalAccessPolicyState.DISABLED,
included_roles=[deleted_role],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("role", deleted_role)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "roles:" in result[0].status_extended
assert deleted_role in result[0].status_extended
def test_orphans_grouped_by_type_across_collections(self):
"""A single policy with orphans of every type aggregates them grouped by type."""
entra_client = _entra_client_mock()
deleted_user = str(uuid4())
deleted_group = str(uuid4())
deleted_role = str(uuid4())
policy_id, policy = _make_policy(
display_name="Composite Policy",
included_users=[deleted_user],
excluded_groups=[deleted_group],
included_roles=[deleted_role],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("user", deleted_user),
("group", deleted_group),
("role", deleted_role),
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "3 deleted directory object(s)" in result[0].status_extended
assert "users:" in result[0].status_extended
assert "groups:" in result[0].status_extended
assert "roles:" in result[0].status_extended
def test_report_only_policy_failure_notes_mode(self):
"""A report-only policy with an orphan FAILs and flags the not-yet-enforced state."""
entra_client = _entra_client_mock()
deleted_user = str(uuid4())
policy_id, policy = _make_policy(
display_name="Report Only MFA",
state=ConditionalAccessPolicyState.ENABLED_FOR_REPORTING,
included_users=[deleted_user],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("user", deleted_user)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "report-only mode" in result[0].status_extended
def test_unverified_reference_is_manual(self):
"""A reference that errored (non-404) yields MANUAL, not PASS."""
entra_client = _entra_client_mock()
errored_group = str(uuid4())
policy_id, policy = _make_policy(
display_name="Throttled Lookup Policy",
included_users=["All"],
excluded_groups=[errored_group],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = set()
entra_client.errored_directory_object_references = {
("group", errored_group)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "MANUAL"
assert "could not be fully evaluated" in result[0].status_extended
assert errored_group in result[0].status_extended
def test_orphan_takes_precedence_over_unverified(self):
"""A confirmed deletion FAILs even when another reference is unverified."""
entra_client = _entra_client_mock()
deleted_user = str(uuid4())
errored_group = str(uuid4())
policy_id, policy = _make_policy(
display_name="Mixed Policy",
included_users=[deleted_user],
excluded_groups=[errored_group],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {policy_id: policy}
entra_client.unresolved_directory_object_references = {
("user", deleted_user)
}
entra_client.errored_directory_object_references = {
("group", errored_group)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "1 deleted directory object(s)" in result[0].status_extended
assert "could not be verified" in result[0].status_extended
def test_multiple_policies_mixed(self):
"""Two policies: one clean, one with an orphan. Distinct PASS/FAIL findings."""
entra_client = _entra_client_mock()
deleted_user = str(uuid4())
clean_id, clean_policy = _make_policy(
display_name="Clean Policy",
included_users=["All"],
)
dirty_id, dirty_policy = _make_policy(
display_name="Stale Reference Policy",
excluded_users=[deleted_user],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(CHECK_MODULE, new=entra_client),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_no_deleted_object_references.entra_conditional_access_policy_no_deleted_object_references import (
entra_conditional_access_policy_no_deleted_object_references,
)
entra_client.conditional_access_policies = {
clean_id: clean_policy,
dirty_id: dirty_policy,
}
entra_client.unresolved_directory_object_references = {
("user", deleted_user)
}
check = entra_conditional_access_policy_no_deleted_object_references()
result = check.execute()
assert len(result) == 2
clean_result = next(r for r in result if r.resource_id == clean_id)
dirty_result = next(r for r in result if r.resource_id == dirty_id)
assert clean_result.status == "PASS"
assert dirty_result.status == "FAIL"
assert "(exclude)" in dirty_result.status_extended
@@ -878,3 +878,189 @@ class Test_Entra_Service:
assert merged.password_credentials[0].key_id == "cred-app"
assert merged.password_credentials[0].display_name == "app-level-secret"
assert merged.password_credentials[0].is_active()
def test__resolve_identifiers_for_type_flags_only_404(self):
"""Only HTTP 404 / Request_ResourceNotFound mark an id as deleted.
Transient errors (5xx, throttling) and successful resolutions must
never be added to the unresolved set — that is the contract the check
relies on to avoid false positives during Graph outages.
"""
from msgraph.generated.models.o_data_errors.main_error import MainError
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
deleted_by_status = "deleted-status-404"
deleted_by_code = "deleted-code-rnf"
transient = "transient-503"
live = "live-user"
error_404 = ODataError()
error_404.response_status_code = 404
error_404.error = None # status code alone is enough
error_rnf = ODataError()
error_rnf.response_status_code = None
error_rnf.error = MainError()
error_rnf.error.code = "Request_ResourceNotFound"
error_503 = ODataError()
error_503.response_status_code = 503
error_503.error = MainError()
error_503.error.code = "ServiceUnavailable"
user_builders = {
deleted_by_status: SimpleNamespace(get=AsyncMock(side_effect=error_404)),
deleted_by_code: SimpleNamespace(get=AsyncMock(side_effect=error_rnf)),
transient: SimpleNamespace(get=AsyncMock(side_effect=error_503)),
live: SimpleNamespace(get=AsyncMock(return_value=SimpleNamespace(id=live))),
}
entra_service = Entra.__new__(Entra)
entra_service.client = SimpleNamespace(
users=SimpleNamespace(
by_user_id=MagicMock(side_effect=lambda uid: user_builders[uid])
)
)
unresolved = set()
errored = set()
asyncio.run(
entra_service._resolve_identifiers_for_type(
"user", set(user_builders), unresolved, errored
)
)
assert unresolved == {
("user", deleted_by_status),
("user", deleted_by_code),
}
# The transient 503 must be recorded as errored (unverified), never as
# deleted and never silently dropped.
assert errored == {("user", transient)}
def test__resolve_identifiers_for_type_role_uses_role_definitions_endpoint(self):
"""A deleted role is resolved against the roleDefinitions endpoint."""
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
deleted_role = "deleted-role-id"
error_404 = ODataError()
error_404.response_status_code = 404
error_404.error = None
by_role_id = MagicMock(
return_value=SimpleNamespace(get=AsyncMock(side_effect=error_404))
)
entra_service = Entra.__new__(Entra)
entra_service.client = SimpleNamespace(
role_management=SimpleNamespace(
directory=SimpleNamespace(
role_definitions=SimpleNamespace(
by_unified_role_definition_id=by_role_id
)
)
)
)
unresolved = set()
errored = set()
asyncio.run(
entra_service._resolve_identifiers_for_type(
"role", {deleted_role}, unresolved, errored
)
)
assert unresolved == {("role", deleted_role)}
assert errored == set()
by_role_id.assert_called_once_with(deleted_role)
def test__resolve_directory_object_references_skips_sentinels_and_dedups(self):
"""End-to-end resolver: sentinels are never queried, ids are deduped
across policies, and only deleted ids land in the unresolved set."""
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
deleted_user = "deleted-user-id"
live_user = "live-user-id"
deleted_group = "deleted-group-id"
errored_group = "errored-group-id"
def _user_conditions(**kwargs):
base = {
"included_users": [],
"excluded_users": [],
"included_groups": [],
"excluded_groups": [],
"included_roles": [],
"excluded_roles": [],
}
base.update(kwargs)
return SimpleNamespace(**base)
def _policy(user_conditions):
return SimpleNamespace(
conditions=SimpleNamespace(user_conditions=user_conditions)
)
policies = {
"policy-a": _policy(
_user_conditions(
included_users=["All", deleted_user, live_user],
excluded_groups=[deleted_group, errored_group],
)
),
# Same deleted_user referenced again — must be resolved only once.
"policy-b": _policy(
_user_conditions(
included_users=[deleted_user],
excluded_users=["GuestsOrExternalUsers"],
)
),
# Policy without user conditions must be skipped without error.
"policy-c": SimpleNamespace(
conditions=SimpleNamespace(user_conditions=None)
),
}
error_404 = ODataError()
error_404.response_status_code = 404
error_404.error = None
error_503 = ODataError()
error_503.response_status_code = 503
error_503.error = None
user_builders = {
deleted_user: SimpleNamespace(get=AsyncMock(side_effect=error_404)),
live_user: SimpleNamespace(
get=AsyncMock(return_value=SimpleNamespace(id=live_user))
),
}
group_builders = {
deleted_group: SimpleNamespace(get=AsyncMock(side_effect=error_404)),
errored_group: SimpleNamespace(get=AsyncMock(side_effect=error_503)),
}
by_user_id = MagicMock(side_effect=lambda uid: user_builders[uid])
by_group_id = MagicMock(side_effect=lambda gid: group_builders[gid])
entra_service = Entra.__new__(Entra)
entra_service.client = SimpleNamespace(
users=SimpleNamespace(by_user_id=by_user_id),
groups=SimpleNamespace(by_group_id=by_group_id),
)
unresolved, errored = asyncio.run(
entra_service._resolve_directory_object_references(policies)
)
assert unresolved == {
("user", deleted_user),
("group", deleted_group),
}
# The 503 group is unverified, not deleted — it lands in errored.
assert errored == {("group", errored_group)}
# Sentinels are filtered before any Graph call; only the two real user
# ids are queried, and the deduped deleted_user is queried exactly once.
queried_users = {call.args[0] for call in by_user_id.call_args_list}
assert queried_users == {deleted_user, live_user}
assert user_builders[deleted_user].get.await_count == 1