feat(m365): add defenderxdr_critical_asset_management_pending_approvals security check (#10085)

Co-authored-by: HugoPBrito <hugopbrit@gmail.com>
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
This commit is contained in:
Andoni Alonso
2026-02-19 18:49:41 +01:00
committed by GitHub
parent e8c0a37d50
commit ea60f2d082
8 changed files with 432 additions and 7 deletions

View File

@@ -46,7 +46,7 @@ When using service principal authentication, add these **Application Permissions
- `SecurityIdentitiesHealth.Read.All`: Required for `defenderidentity_health_issues_no_open` check.
- `SecurityIdentitiesSensors.Read.All`: Required for `defenderidentity_health_issues_no_open` check.
- `SharePointTenantSettings.Read.All`: Required for SharePoint service.
- `ThreatHunting.Read.All`: Required for Entra checks that use Defender XDR Advanced Hunting (e.g., unused privileged permissions detection). Also requires App Governance to be enabled in Microsoft Defender for Cloud Apps.
- `ThreatHunting.Read.All`: Required for Defender XDR checks (`defenderxdr_endpoint_privileged_user_exposed_credentials`, `defenderxdr_critical_asset_management_pending_approvals`).
**External API Permissions:**

View File

@@ -23,6 +23,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061)
- ECS Exec (ECS-006) privilege escalation detection via `ecs:ExecuteCommand` + `ecs:DescribeTasks` [(#10066)](https://github.com/prowler-cloud/prowler/pull/10066)
- `defenderxdr_endpoint_privileged_user_exposed_credentials` check for M365 provider [(#10084)](https://github.com/prowler-cloud/prowler/pull/10084)
- `defenderxdr_critical_asset_management_pending_approvals` check for M365 provider [(#10085)](https://github.com/prowler-cloud/prowler/pull/10085)
- `entra_seamless_sso_disabled` check for m365 provider [(#10086)](https://github.com/prowler-cloud/prowler/pull/10086)
- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)

View File

@@ -156,6 +156,7 @@
}
],
"Checks": [
"defenderxdr_critical_asset_management_pending_approvals",
"sharepoint_external_sharing_managed",
"exchange_external_email_tagging_enabled"
]
@@ -455,6 +456,7 @@
"defender_antispam_outbound_policy_configured",
"defender_antispam_outbound_policy_forwarding_disabled",
"defender_antispam_policy_inbound_no_allowed_domains",
"defenderxdr_critical_asset_management_pending_approvals",
"defender_chat_report_policy_configured",
"defender_malware_policy_common_attachments_filter_enabled",
"defender_malware_policy_comprehensive_attachments_filter_applied",

View File

@@ -0,0 +1,37 @@
{
"Provider": "m365",
"CheckID": "defenderxdr_critical_asset_management_pending_approvals",
"CheckTitle": "Ensure all Critical Asset Management classifications are reviewed and approved in Microsoft Defender XDR",
"CheckType": [],
"ServiceName": "defenderxdr",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Defender XDR Critical Asset Management",
"ResourceGroup": "security",
"Description": "Assets with a lower classification confidence score in Microsoft Defender XDR must be approved by a security administrator.\n\nAsset classifications that have not yet been reviewed and approved may result in incomplete **critical asset** visibility.",
"Risk": "Stale pending approvals lead to limited visibility in Microsoft Defender XDR. **Critical assets** that are not properly identified and classified may not receive appropriate security monitoring and protections, creating gaps in the organization's security posture.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/security-exposure-management/classify-critical-assets",
"https://learn.microsoft.com/en-us/security-exposure-management/classify-critical-assets#review-critical-assets"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to **Microsoft Defender** at https://security.microsoft.com/\n2. Go to **Settings** > **Microsoft Defender XDR** > **Critical asset management**\n3. Review each pending approval listed in the check results\n4. Verify the correct classification for each asset\n5. Approve or reject the classification as appropriate",
"Terraform": ""
},
"Recommendation": {
"Text": "Regularly review and approve pending critical asset classifications to ensure accurate asset visibility in Microsoft Defender XDR. Stale approvals reduce the effectiveness of security monitoring and incident response for critical assets.",
"Url": "https://hub.prowler.com/check/defenderxdr_critical_asset_management_pending_approvals"
}
},
"Categories": [
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender XDR with Security Exposure Management enabled. The ThreatHunting.Read.All permission is required to query the ExposureGraphNodes table via the Advanced Hunting API. Approved assets will be reflected in the classification table within 24 hours."
}

View File

@@ -0,0 +1,86 @@
"""Check for pending Critical Asset Management approvals in Defender XDR.
This check identifies asset classifications with low confidence scores
that require security administrator review and approval.
"""
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defenderxdr.defenderxdr_client import (
defenderxdr_client,
)
class defenderxdr_critical_asset_management_pending_approvals(Check):
"""Check for pending Critical Asset Management approvals in Microsoft Defender XDR.
This check queries Advanced Hunting to identify assets with low classification
confidence scores that have not been reviewed by a security administrator.
Prerequisites:
1. ThreatHunting.Read.All permission granted
2. Microsoft Defender XDR with Security Exposure Management enabled
Results:
- PASS: No pending approvals for Critical Asset Management are found.
- FAIL: At least one asset classification has pending approvals.
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the check for pending Critical Asset Management approvals.
Evaluates whether there are any pending Critical Asset Management
approvals that require administrator review.
Returns:
A list of reports containing the result of the check.
"""
findings = []
pending_approvals = defenderxdr_client.pending_cam_approvals
# API call failed - likely missing ThreatHunting.Read.All permission
if pending_approvals is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Critical Asset Management",
resource_id="criticalAssetManagement",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Critical Asset Management status. "
"Verify that ThreatHunting.Read.All permission is granted."
)
findings.append(report)
return findings
if not pending_approvals:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Critical Asset Management",
resource_id="criticalAssetManagement",
)
report.status = "PASS"
report.status_extended = "No pending approvals for Critical Asset Management classifications are found."
findings.append(report)
else:
for approval in pending_approvals:
report = CheckReportM365(
metadata=self.metadata(),
resource=approval,
resource_name=f"CAM Classification: {approval.classification}",
resource_id=f"cam/{approval.classification}",
)
report.status = "FAIL"
assets_summary = ", ".join(approval.assets[:5])
if len(approval.assets) > 5:
assets_summary += f" and {len(approval.assets) - 5} more"
report.status_extended = (
f"Critical Asset Management classification '{approval.classification}' "
f"has {approval.pending_count} asset(s) pending approval: {assets_summary}."
)
findings.append(report)
return findings

View File

@@ -28,12 +28,15 @@ class DefenderXDR(M365Service):
- Device security posture
- Exposed credentials detection
- Vulnerability assessments
- Critical Asset Management approvals
Attributes:
mde_status: Status of MDE deployment
(None, "not_enabled", "no_devices", "active")
exposed_credentials_privileged_users: List of privileged users
with exposed credentials
pending_cam_approvals: List of pending Critical Asset Management
approvals (None if API error)
"""
def __init__(self, provider: M365Provider):
@@ -52,15 +55,19 @@ class DefenderXDR(M365Service):
self.exposed_credentials_privileged_users: Optional[
List[ExposedCredentialPrivilegedUser]
] = []
self.pending_cam_approvals: Optional[List[PendingCAMApproval]] = []
loop = self._get_event_loop()
try:
self.mde_status, self.exposed_credentials_privileged_users = (
loop.run_until_complete(
asyncio.gather(
self._check_mde_status(),
self._get_exposed_credentials_privileged_users(),
)
(
self.mde_status,
self.exposed_credentials_privileged_users,
self.pending_cam_approvals,
) = loop.run_until_complete(
asyncio.gather(
self._check_mde_status(),
self._get_exposed_credentials_privileged_users(),
self._get_pending_cam_approvals(),
)
)
finally:
@@ -222,6 +229,63 @@ ExposureGraphEdges
target_categories=target_categories,
)
async def _get_pending_cam_approvals(
self,
) -> Optional[List["PendingCAMApproval"]]:
"""Query for pending Critical Asset Management approvals.
Queries the ExposureGraphNodes table to find assets with low criticality
confidence scores that require administrator approval.
Returns:
List of PendingCAMApproval objects, or None if API call failed.
"""
logger.info(
"DefenderXDR - Querying for pending Critical Asset Management approvals..."
)
query = """
ExposureGraphNodes
| where isnotempty(parse_json(NodeProperties)['rawData']['criticalityConfidenceLow'])
| mv-expand parse_json(NodeProperties)['rawData']['criticalityConfidenceLow']
| extend Classification = tostring(NodeProperties_rawData_criticalityConfidenceLow)
| summarize PendingApproval = count(), Assets = array_sort_asc(make_set(NodeName)) by Classification
| sort by Classification asc
"""
results, _ = await self._run_hunting_query(query)
if results is None:
return None
pending_approvals = []
for row in results:
if not row:
continue
classification = row.get("Classification", "")
pending_count = int(row.get("PendingApproval", 0))
assets_raw = row.get("Assets", "[]")
if isinstance(assets_raw, str):
try:
assets = json.loads(assets_raw)
except (json.JSONDecodeError, ValueError):
assets = []
elif isinstance(assets_raw, list):
assets = assets_raw
else:
assets = []
pending_approvals.append(
PendingCAMApproval(
classification=classification,
pending_count=pending_count,
assets=assets,
)
)
return pending_approvals
class ExposedCredentialPrivilegedUser(BaseModel):
"""Model for exposed credential data of a privileged user.
@@ -239,3 +303,20 @@ class ExposedCredentialPrivilegedUser(BaseModel):
target_node_label: str
credential_type: Optional[str] = None
target_categories: list = []
class PendingCAMApproval(BaseModel):
"""Model for a pending Critical Asset Management approval classification.
Represents assets with low criticality confidence scores that require
security administrator review and approval.
Attributes:
classification: The asset classification name pending approval.
pending_count: The number of assets pending approval for this classification.
assets: List of asset names pending approval.
"""
classification: str
pending_count: int
assets: List[str]

View File

@@ -0,0 +1,218 @@
from unittest import mock
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
PendingCAMApproval,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defenderxdr_critical_asset_management_pending_approvals:
"""Tests for the defenderxdr_critical_asset_management_pending_approvals check."""
def test_api_failed_missing_permission(self):
"""Test FAIL when API call fails (None): missing ThreatHunting.Read.All permission."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Unable to query Critical Asset Management" in result[0].status_extended
)
assert "ThreatHunting.Read.All" in result[0].status_extended
assert result[0].resource_id == "criticalAssetManagement"
def test_no_pending_approvals_pass(self):
"""Test PASS scenario when there are no pending CAM approvals."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No pending approvals for Critical Asset Management classifications are found."
)
assert result[0].resource_name == "Critical Asset Management"
assert result[0].resource_id == "criticalAssetManagement"
assert result[0].resource == {}
def test_single_pending_approval_fail(self):
"""Test FAIL scenario when there is one pending CAM approval."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=2,
assets=["server-01", "server-02"],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 2 asset(s) pending approval: server-01, server-02."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"
assert (
result[0].resource == defenderxdr_client.pending_cam_approvals[0].dict()
)
def test_multiple_pending_approvals_fail(self):
"""Test FAIL scenario when there are multiple pending CAM approvals."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=1,
assets=["server-01"],
),
PendingCAMApproval(
classification="Critical",
pending_count=3,
assets=["db-01", "db-02", "db-03"],
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 1 asset(s) pending approval: server-01."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "Critical Asset Management classification 'Critical' has 3 asset(s) pending approval: db-01, db-02, db-03."
)
assert result[1].resource_name == "CAM Classification: Critical"
assert result[1].resource_id == "cam/Critical"
def test_pending_approval_with_more_than_five_assets_fail(self):
"""Test FAIL scenario with more than 5 assets to verify truncation."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=7,
assets=[
"server-01",
"server-02",
"server-03",
"server-04",
"server-05",
"server-06",
"server-07",
],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 7 asset(s) pending approval: server-01, server-02, server-03, server-04, server-05 and 2 more."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"