feat(m365): add defenderxdr_endpoint_privileged_user_exposed_credentials security check (#10084)

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
This commit is contained in:
Hugo Pereira Brito
2026-02-19 17:52:16 +01:00
committed by GitHub
parent 20b26bc7d0
commit 48b94b2a9f
12 changed files with 814 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- OpenStack compute 7 new checks [(#9944)](https://github.com/prowler-cloud/prowler/pull/9944)
- CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061)
- ECS Exec (ECS-006) privilege escalation detection via `ecs:ExecuteCommand` + `ecs:DescribeTasks` [(#10066)](https://github.com/prowler-cloud/prowler/pull/10066)
- `defenderxdr_endpoint_privileged_user_exposed_credentials` check for M365 provider [(#10084)](https://github.com/prowler-cloud/prowler/pull/10084)
- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)

View File

@@ -117,6 +117,7 @@
"defender_malware_policy_notifications_internal_users_malware_enabled",
"defender_safelinks_policy_enabled",
"defender_zap_for_teams_enabled",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"defender_identity_health_issues_no_open",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_identity_protection_sign_in_risk_enabled",
@@ -232,6 +233,7 @@
}
],
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_admin_users_sign_in_frequency_enabled",
"entra_admin_users_mfa_enabled",
"entra_admin_users_sign_in_frequency_enabled",
@@ -604,6 +606,7 @@
}
],
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_managed_device_required_for_authentication",
"entra_users_mfa_enabled",
"entra_managed_device_required_for_mfa_registration",
@@ -631,6 +634,7 @@
"admincenter_users_admins_reduced_license_footprint",
"admincenter_users_between_two_and_four_global_admins",
"defender_antispam_outbound_policy_configured",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_admin_consent_workflow_enabled",
"entra_admin_portals_access_restriction",
"entra_admin_users_cloud_only",
@@ -719,6 +723,7 @@
"defender_malware_policy_common_attachments_filter_enabled",
"defender_malware_policy_comprehensive_attachments_filter_applied",
"defender_malware_policy_notifications_internal_users_malware_enabled",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"defender_identity_health_issues_no_open"
]
},

View File

@@ -387,6 +387,7 @@
"Id": "1.2.4",
"Description": "Enable Identity Protection user risk policies",
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_identity_protection_user_risk_enabled"
],
"Attributes": [

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import DefenderXDR
defenderxdr_client = DefenderXDR(Provider.get_global_provider())

View File

@@ -0,0 +1,39 @@
{
"Provider": "m365",
"CheckID": "defenderxdr_endpoint_privileged_user_exposed_credentials",
"CheckTitle": "Privileged users do not have credentials exposed on vulnerable endpoints",
"CheckType": [],
"ServiceName": "defenderxdr",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "critical",
"ResourceType": "Exposure Management",
"ResourceGroup": "security",
"Description": "Privileged users may have authentication artifacts (CLI secrets, cookies, tokens) exposed on endpoints with high risk scores. Microsoft Defender XDR's Security Exposure Management detects when credentials from users with Entra ID privileged roles are present on vulnerable devices.",
"Risk": "Exposed credentials on vulnerable endpoints enable account takeover through stolen tokens or cookies, Conditional Access bypass via primary refresh tokens, lateral movement to sensitive resources, and persistence until tokens are explicitly revoked.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/security-exposure-management/prerequisites",
"https://learn.microsoft.com/en-us/defender-xdr/advanced-hunting-exposuregraphedges-table"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Defender portal at https://security.microsoft.com\n2. Go to Exposure Management > Attack surface > Attack paths\n3. Review the exposed credential findings for privileged users\n4. For each affected device, review the risk and exposure score in Device Inventory\n5. Remediate endpoint vulnerabilities and improve device security posture\n6. Revoke affected user sessions and rotate credentials\n7. Consider implementing Privileged Access Workstations (PAWs) for privileged users",
"Terraform": ""
},
"Recommendation": {
"Text": "Privileged users should only authenticate from secure, hardened devices with low exposure scores. Implement Privileged Access Workstations (PAWs) and enforce device compliance policies through Conditional Access to prevent credential exposure on vulnerable endpoints.",
"Url": "https://hub.prowler.com/check/defenderxdr_endpoint_privileged_user_exposed_credentials"
}
},
"Categories": [
"secrets",
"identity-access",
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender XDR with Security Exposure Management enabled. The ThreatHunting.Read.All permission is required to query the ExposureGraphEdges table via the Advanced Hunting API."
}

View File

@@ -0,0 +1,148 @@
"""Check for exposed credentials of privileged users in Defender XDR.
This check identifies privileged users whose authentication credentials
(CLI secrets, cookies, tokens) are exposed on vulnerable endpoints.
"""
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defenderxdr.defenderxdr_client import (
defenderxdr_client,
)
class defenderxdr_endpoint_privileged_user_exposed_credentials(Check):
"""Check if privileged users have exposed credentials on endpoints.
This check queries Microsoft Defender XDR's ExposureGraphEdges
table via the Advanced Hunting API to identify privileged users whose
authentication artifacts (CLI secrets, user cookies, sensitive tokens)
are exposed on endpoints with high risk or exposure scores.
Prerequisites:
1. ThreatHunting.Read.All permission granted
2. Microsoft Defender for Endpoint (MDE) enabled and deployed on devices
Results:
- PASS: No exposed credentials found OR MDE enabled but no devices
- FAIL: Exposed credentials detected OR MDE not enabled (blind spot)
"""
def execute(self) -> list[CheckReportM365]:
"""Execute the check for exposed credentials of privileged users.
Returns:
List[CheckReportM365]: A list of reports with check results.
"""
findings = []
# Step 1: Check MDE status
mde_status = defenderxdr_client.mde_status
# API call failed - likely missing ThreatHunting.Read.All permission
if mde_status is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeStatus",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Microsoft Defender XDR status. "
"Verify that ThreatHunting.Read.All permission is granted."
)
findings.append(report)
return findings
# MDE not enabled - this is a security blind spot
if mde_status == "not_enabled":
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeStatus",
)
report.status = "FAIL"
report.status_extended = (
"Microsoft Defender for Endpoint is not enabled. "
"Without MDE there is no visibility into credential "
"exposure on endpoints."
)
findings.append(report)
return findings
# MDE enabled but no devices - PASS (no endpoints to evaluate)
if mde_status == "no_devices":
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeDevices",
)
report.status = "PASS"
report.status_extended = (
"Microsoft Defender for Endpoint is enabled but no devices "
"are onboarded. No endpoints to evaluate for credential "
"exposure."
)
findings.append(report)
return findings
# Step 2: MDE is active with devices - check for exposed credentials
exposed_credentials = defenderxdr_client.exposed_credentials_privileged_users
# API call failed for exposed credentials query
if exposed_credentials is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="exposedCredentials",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Security Exposure Management for exposed "
"credentials. Verify that Security Exposure Management "
"is enabled."
)
findings.append(report)
return findings
# Found exposed credentials - report each one
if exposed_credentials:
for exposed_user in exposed_credentials:
report = CheckReportM365(
metadata=self.metadata(),
resource=exposed_user,
resource_name=exposed_user.target_node_name,
resource_id=(exposed_user.target_node_id or exposed_user.edge_id),
)
report.status = "FAIL"
credential_info = (
f" ({exposed_user.credential_type})"
if exposed_user.credential_type
else ""
)
report.status_extended = (
f"Privileged user {exposed_user.target_node_name} has "
f"exposed credentials{credential_info} on device "
f"{exposed_user.source_node_name}."
)
findings.append(report)
else:
# No exposed credentials found - full visibility, no risk detected
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR Exposure Management",
resource_id="exposedCredentials",
)
report.status = "PASS"
report.status_extended = (
"No exposed credentials found for privileged users on "
"vulnerable endpoints."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,241 @@
"""Microsoft Defender XDR service module.
This module provides access to Microsoft Defender XDR data
through the Microsoft Graph Security Advanced Hunting API.
"""
import asyncio
import json
from typing import Dict, List, Optional
from msgraph.generated.security.microsoft_graph_security_run_hunting_query.run_hunting_query_post_request_body import (
RunHuntingQueryPostRequestBody,
)
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
class DefenderXDR(M365Service):
"""Microsoft Defender XDR service class.
Provides access to Microsoft Defender XDR data through
the Microsoft Graph Security Advanced Hunting API.
This class handles endpoint security checks including:
- Device security posture
- Exposed credentials detection
- Vulnerability assessments
Attributes:
mde_status: Status of MDE deployment
(None, "not_enabled", "no_devices", "active")
exposed_credentials_privileged_users: List of privileged users
with exposed credentials
"""
def __init__(self, provider: M365Provider):
"""Initialize the DefenderXDR service client.
Args:
provider: The M365Provider instance for authentication.
"""
super().__init__(provider)
# MDE status: None = API error, "not_enabled" = table not found,
# "no_devices" = enabled but empty, "active" = has devices
self.mde_status: Optional[str] = None
# Check data
self.exposed_credentials_privileged_users: Optional[
List[ExposedCredentialPrivilegedUser]
] = []
loop = self._get_event_loop()
try:
self.mde_status, self.exposed_credentials_privileged_users = (
loop.run_until_complete(
asyncio.gather(
self._check_mde_status(),
self._get_exposed_credentials_privileged_users(),
)
)
)
finally:
self._cleanup_event_loop(loop)
def _get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Get or create an event loop for async operations."""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
raise RuntimeError(
"Cannot initialize DefenderXDR service while event loop is running"
)
return loop
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def _cleanup_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""Clean up the event loop if we created it."""
try:
if loop and not loop.is_running():
asyncio.set_event_loop(None)
loop.close()
except Exception as error:
# Best-effort cleanup: swallow errors but log them for diagnostics
logger.debug(f"DefenderXDR - Failed to clean up event loop: {error}")
async def _run_hunting_query(self, query: str) -> tuple[Optional[List[Dict]], bool]:
"""Execute an Advanced Hunting query using Microsoft Graph Security API.
Args:
query: The KQL (Kusto Query Language) query to execute.
Returns:
Tuple of (results, table_not_found):
- results: List of result dicts, empty list if no results,
None if API error.
- table_not_found: True if query failed because table
doesn't exist.
"""
try:
request_body = RunHuntingQueryPostRequestBody(query=query)
response = await self.client.security.microsoft_graph_security_run_hunting_query.post(
request_body
)
if not response or not response.results:
return [], False
results = [
row.additional_data
for row in response.results
if hasattr(row, "additional_data")
]
return results, False
except Exception as error:
error_message = str(error).lower()
if (
"failed to resolve table" in error_message
or "could not find table" in error_message
):
logger.warning(f"DefenderXDR - Table not found in query: {error}")
return [], True
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None, False
async def _check_mde_status(self) -> Optional[str]:
"""Check Microsoft Defender for Endpoint status.
Returns:
- None: API call failed (permission issue)
- "not_enabled": DeviceInfo table doesn't exist (MDE not enabled)
- "no_devices": MDE enabled but no devices onboarded
- "active": MDE enabled with devices reporting
"""
logger.info("DefenderXDR - Checking MDE status...")
query = "DeviceInfo | summarize DeviceCount = count()"
results, table_not_found = await self._run_hunting_query(query)
if results is None:
return None
if table_not_found:
return "not_enabled"
if results and len(results) > 0:
device_count = results[0].get("DeviceCount", 0)
if device_count > 0:
return "active"
return "no_devices"
async def _get_exposed_credentials_privileged_users(
self,
) -> Optional[List["ExposedCredentialPrivilegedUser"]]:
"""Query for privileged users with exposed credentials.
Returns:
List of ExposedCredentialPrivilegedUser objects,
or None if API call failed.
"""
logger.info(
"DefenderXDR - Querying for exposed credentials of privileged users..."
)
query = """
ExposureGraphEdges
| where EdgeLabel == "hasCredentialsFor"
| where TargetNodeLabel == "user"
| extend targetCategories = parse_json(TargetNodeCategories)
| where targetCategories has "PrivilegedEntraIdRole" or targetCategories has "privileged"
| extend credentialType = tostring(parse_json(EdgeProperties).credentialType)
| project
EdgeId,
SourceNodeId,
SourceNodeName,
SourceNodeLabel,
TargetNodeId,
TargetNodeName,
TargetNodeLabel,
CredentialType = credentialType,
TargetCategories = TargetNodeCategories
"""
results, _ = await self._run_hunting_query(query)
if results is None:
return None
return [self._parse_exposed_credential(row) for row in results if row]
def _parse_exposed_credential(self, row: Dict) -> "ExposedCredentialPrivilegedUser":
"""Parse a single row into an ExposedCredentialPrivilegedUser."""
target_categories = row.get("TargetCategories", [])
if isinstance(target_categories, str):
try:
target_categories = json.loads(target_categories)
except (json.JSONDecodeError, ValueError):
target_categories = []
return ExposedCredentialPrivilegedUser(
edge_id=str(row.get("EdgeId", "")),
source_node_id=str(row.get("SourceNodeId", "")),
source_node_name=str(row.get("SourceNodeName", "Unknown")),
source_node_label=str(row.get("SourceNodeLabel", "")),
target_node_id=str(row.get("TargetNodeId", "")),
target_node_name=str(row.get("TargetNodeName", "Unknown")),
target_node_label=str(row.get("TargetNodeLabel", "")),
credential_type=str(row.get("CredentialType") or "Unknown"),
target_categories=target_categories,
)
class ExposedCredentialPrivilegedUser(BaseModel):
"""Model for exposed credential data of a privileged user.
Represents authentication credentials (CLI secrets, user cookies, tokens)
of privileged users that are exposed on vulnerable endpoints.
"""
edge_id: str
source_node_id: str
source_node_name: str
source_node_label: str
target_node_id: str
target_node_name: str
target_node_label: str
credential_type: Optional[str] = None
target_categories: list = []

View File

@@ -0,0 +1,375 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defenderxdr_endpoint_privileged_user_exposed_credentials:
"""Tests for the defenderxdr_endpoint_privileged_user_exposed_credentials check."""
def test_mde_status_api_failed(self):
"""Test FAIL when MDE status API call fails (None): missing permission."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = None
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Unable to query Microsoft Defender XDR" in result[0].status_extended
assert "ThreatHunting.Read.All" in result[0].status_extended
assert result[0].resource_id == "mdeStatus"
def test_mde_not_enabled(self):
"""Test FAIL when MDE is not enabled - security blind spot."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "not_enabled"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Microsoft Defender for Endpoint is not enabled"
in result[0].status_extended
)
assert "no visibility" in result[0].status_extended
assert result[0].resource_id == "mdeStatus"
def test_mde_no_devices(self):
"""Test PASS when MDE is enabled but no devices are onboarded."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "no_devices"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "enabled but no devices are onboarded" in result[0].status_extended
assert "No endpoints to evaluate" in result[0].status_extended
assert result[0].resource_id == "mdeDevices"
def test_exposed_credentials_query_failed(self):
"""Test FAIL when exposed credentials query fails (None)."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
defenderxdr_client.exposed_credentials_privileged_users = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Unable to query Security Exposure Management"
in result[0].status_extended
)
assert result[0].resource_id == "exposedCredentials"
def test_no_exposed_credentials(self):
"""Test PASS when no privileged users have exposed credentials."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
"No exposed credentials found for privileged users"
in result[0].status_extended
)
assert result[0].resource_name == "Defender XDR Exposure Management"
assert result[0].resource_id == "exposedCredentials"
def test_single_exposed_credential_with_credential_type(self):
"""Test FAIL when a privileged user has exposed credentials with type."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="CLI secret",
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "admin@contoso.com" in result[0].status_extended
assert "CLI secret" in result[0].status_extended
assert "WORKSTATION01" in result[0].status_extended
assert result[0].resource_name == "admin@contoso.com"
assert result[0].resource_id == "user-789"
def test_single_exposed_credential_without_credential_type(self):
"""Test FAIL when a privileged user has exposed credentials without type."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type=None,
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "admin@contoso.com" in result[0].status_extended
assert "WORKSTATION01" in result[0].status_extended
assert result[0].resource_name == "admin@contoso.com"
assert result[0].resource_id == "user-789"
def test_multiple_exposed_credentials(self):
"""Test FAIL for multiple privileged users with exposed credentials."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user_1 = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="CLI secret",
target_categories=["PrivilegedEntraIdRole"],
)
exposed_user_2 = ExposedCredentialPrivilegedUser(
edge_id="edge-456",
source_node_id="device-789",
source_node_name="SERVER01",
source_node_label="device",
target_node_id="user-012",
target_node_name="globaladmin@contoso.com",
target_node_label="user",
credential_type="user cookie",
target_categories=["PrivilegedEntraIdRole", "privileged"],
)
defenderxdr_client.exposed_credentials_privileged_users = [
exposed_user_1,
exposed_user_2,
]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].resource_name == "admin@contoso.com"
assert result[1].status == "FAIL"
assert result[1].resource_name == "globaladmin@contoso.com"
def test_exposed_credential_uses_edge_id_when_target_node_id_missing(self):
"""Test that edge_id is used as resource_id when target_node_id is empty."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-fallback-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="sensitive token",
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "edge-fallback-123"
assert result[0].resource_name == "admin@contoso.com"