Compare commits

...

5 Commits

Author SHA1 Message Date
Andoni A.
512986a6b9 Merge branch 'master' into feat/PROWLER-943-image-provider-ui 2026-02-25 08:38:34 +01:00
Andoni A.
a7dc172cd3 fix(image): route bare registry hostnames to OCI catalog in test_connection
The API passes provider.uid (a bare hostname) to test_connection(),
which misclassified it as a Docker Hub image reference and failed auth
against Docker Hub instead of the target registry.

Add _is_registry_url() to detect bare hostnames (dots in host, no
slash) and _test_registry_connection() to test connectivity via the
OCI catalog endpoint. Also fix the UI placeholder to show a valid
example without https://.
2026-02-24 16:12:29 +01:00
Andoni A.
a2766ba28b feat(image): enable misconfig scanner by default for image scans 2026-02-24 16:05:10 +01:00
Andoni A.
06dbc6bf8e fix(ui): add cross-field validation and fix naming collision for image provider
- Add superRefine validation requiring registry username when password is provided
- Rename PROVIDER_ICONS to PROVIDER_BADGE_BY_NAME to avoid collision with provider-icon-cell
2026-02-24 15:42:15 +01:00
Andoni A.
4b36be1f11 feat(ui): add image (Container Registry) provider support 2026-02-24 15:02:33 +01:00
55 changed files with 446 additions and 3199 deletions

View File

@@ -41,12 +41,10 @@ When using service principal authentication, add these **Application Permissions
- `AuditLog.Read.All`: Required for Entra service.
- `Directory.Read.All`: Required for all services.
- `OnPremDirectorySynchronization.Read.All`: Required for `entra_seamless_sso_disabled` check (hybrid deployments).
- `Policy.Read.All`: Required for all services.
- `SecurityIdentitiesHealth.Read.All`: Required for `defenderidentity_health_issues_no_open` check.
- `SecurityIdentitiesSensors.Read.All`: Required for `defenderidentity_health_issues_no_open` check.
- `SharePointTenantSettings.Read.All`: Required for SharePoint service.
- `ThreatHunting.Read.All`: Required for Defender XDR checks (`defenderxdr_endpoint_privileged_user_exposed_credentials`, `defenderxdr_critical_asset_management_pending_approvals`).
**External API Permissions:**
@@ -109,7 +107,6 @@ Browser and Azure CLI authentication methods limit scanning capabilities to chec
- `AuditLog.Read.All`: Required for Entra service
- `Directory.Read.All`: Required for all services
- `OnPremDirectorySynchronization.Read.All`: Required for `entra_seamless_sso_disabled` check (hybrid deployments)
- `Policy.Read.All`: Required for all services
- `SecurityIdentitiesHealth.Read.All`: Required for `defenderidentity_health_issues_no_open` check
- `SecurityIdentitiesSensors.Read.All`: Required for `defenderidentity_health_issues_no_open` check

View File

@@ -6,7 +6,6 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🚀 Added
- `entra_app_registration_no_unused_privileged_permissions` check for m365 provider [(#10080)](https://github.com/prowler-cloud/prowler/pull/10080)
- `defenderidentity_health_issues_no_open` check for M365 provider [(#10087)](https://github.com/prowler-cloud/prowler/pull/10087)
- `organization_verified_badge` check for GitHub provider [(#10033)](https://github.com/prowler-cloud/prowler/pull/10033)
- OpenStack provider `clouds_yaml_content` parameter for API integration [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003)
@@ -22,9 +21,6 @@ All notable changes to the **Prowler SDK** are documented in this file.
- OpenStack compute 7 new checks [(#9944)](https://github.com/prowler-cloud/prowler/pull/9944)
- CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061)
- ECS Exec (ECS-006) privilege escalation detection via `ecs:ExecuteCommand` + `ecs:DescribeTasks` [(#10066)](https://github.com/prowler-cloud/prowler/pull/10066)
- `defenderxdr_endpoint_privileged_user_exposed_credentials` check for M365 provider [(#10084)](https://github.com/prowler-cloud/prowler/pull/10084)
- `defenderxdr_critical_asset_management_pending_approvals` check for M365 provider [(#10085)](https://github.com/prowler-cloud/prowler/pull/10085)
- `entra_seamless_sso_disabled` check for m365 provider [(#10086)](https://github.com/prowler-cloud/prowler/pull/10086)
- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)

View File

@@ -983,7 +983,6 @@
"Id": "5.1.5.1",
"Description": "Control when end users and group owners are allowed to grant consent to applications, and when they will be required to request administrator review and approval. Allowing users to grant apps access to data helps them acquire useful applications and be productive but can represent a risk in some situations if it's not monitored and controlled carefully.",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps"
],
"Attributes": [

View File

@@ -1215,7 +1215,6 @@
"Id": "5.1.5.1",
"Description": "User consent to apps accessing company data on their behalf allows users to grant permissions to applications without administrator involvement. The recommended state is Do not allow user consent.",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps"
],
"Attributes": [

View File

@@ -117,7 +117,6 @@
"defender_malware_policy_notifications_internal_users_malware_enabled",
"defender_safelinks_policy_enabled",
"defender_zap_for_teams_enabled",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"defender_identity_health_issues_no_open",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_identity_protection_sign_in_risk_enabled",
@@ -156,7 +155,6 @@
}
],
"Checks": [
"defenderxdr_critical_asset_management_pending_approvals",
"sharepoint_external_sharing_managed",
"exchange_external_email_tagging_enabled"
]
@@ -202,8 +200,7 @@
"admincenter_users_admins_reduced_license_footprint",
"entra_admin_portals_access_restriction",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_policy_guest_users_access_restrictions",
"entra_seamless_sso_disabled"
"entra_policy_guest_users_access_restrictions"
]
},
{
@@ -219,8 +216,7 @@
}
],
"Checks": [
"admincenter_settings_password_never_expire",
"entra_seamless_sso_disabled"
"admincenter_settings_password_never_expire"
]
},
{
@@ -236,13 +232,11 @@
}
],
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_admin_users_sign_in_frequency_enabled",
"entra_admin_users_mfa_enabled",
"entra_admin_users_sign_in_frequency_enabled",
"entra_legacy_authentication_blocked",
"entra_managed_device_required_for_authentication",
"entra_seamless_sso_disabled",
"entra_users_mfa_enabled",
"exchange_organization_modern_authentication_enabled",
"exchange_transport_config_smtp_auth_disabled",
@@ -262,12 +256,11 @@
}
],
"Checks": [
"entra_admin_portals_access_restriction",
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_guest_users_access_restrictions",
"sharepoint_external_sharing_managed",
"sharepoint_external_sharing_restricted",
"sharepoint_guest_sharing_restricted"
"sharepoint_external_sharing_managed",
"sharepoint_guest_sharing_restricted",
"entra_policy_guest_users_access_restrictions",
"entra_admin_portals_access_restriction"
]
},
{
@@ -456,7 +449,6 @@
"defender_antispam_outbound_policy_configured",
"defender_antispam_outbound_policy_forwarding_disabled",
"defender_antispam_policy_inbound_no_allowed_domains",
"defenderxdr_critical_asset_management_pending_approvals",
"defender_chat_report_policy_configured",
"defender_malware_policy_common_attachments_filter_enabled",
"defender_malware_policy_comprehensive_attachments_filter_applied",
@@ -611,7 +603,6 @@
}
],
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_managed_device_required_for_authentication",
"entra_users_mfa_enabled",
"entra_managed_device_required_for_mfa_registration",
@@ -639,17 +630,14 @@
"admincenter_users_admins_reduced_license_footprint",
"admincenter_users_between_two_and_four_global_admins",
"defender_antispam_outbound_policy_configured",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_admin_consent_workflow_enabled",
"entra_admin_portals_access_restriction",
"entra_admin_users_cloud_only",
"entra_admin_users_mfa_enabled",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_admin_users_sign_in_frequency_enabled",
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_ensure_default_user_cannot_create_tenants",
"entra_policy_guest_invite_only_for_admin_roles",
"entra_seamless_sso_disabled"
"entra_policy_guest_invite_only_for_admin_roles"
]
},
{
@@ -686,7 +674,6 @@
"entra_admin_users_sign_in_frequency_enabled",
"entra_admin_users_mfa_enabled",
"entra_managed_device_required_for_authentication",
"entra_seamless_sso_disabled",
"entra_users_mfa_enabled",
"entra_identity_protection_sign_in_risk_enabled"
]
@@ -730,7 +717,6 @@
"defender_malware_policy_common_attachments_filter_enabled",
"defender_malware_policy_comprehensive_attachments_filter_applied",
"defender_malware_policy_notifications_internal_users_malware_enabled",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"defender_identity_health_issues_no_open"
]
},
@@ -781,9 +767,8 @@
}
],
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps",
"entra_thirdparty_integrated_apps_not_allowed",
"entra_policy_restricts_user_consent_for_apps",
"teams_external_domains_restricted",
"teams_external_users_cannot_start_conversations"
]
@@ -874,10 +859,9 @@
}
],
"Checks": [
"entra_policy_restricts_user_consent_for_apps",
"admincenter_users_admins_reduced_license_footprint",
"defender_malware_policy_comprehensive_attachments_filter_applied",
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps",
"entra_thirdparty_integrated_apps_not_allowed",
"sharepoint_modern_authentication_required"
]

View File

@@ -387,7 +387,6 @@
"Id": "1.2.4",
"Description": "Enable Identity Protection user risk policies",
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_identity_protection_user_risk_enabled"
],
"Attributes": [
@@ -713,7 +712,6 @@
"Id": "1.3.3",
"Description": "Ensure third party integrated applications are not allowed",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_thirdparty_integrated_apps_not_allowed"
],
"Attributes": [
@@ -750,7 +748,6 @@
"Id": "1.3.5",
"Description": "Ensure user consent to apps accessing company data on their behalf is not allowed",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps"
],
"Attributes": [
@@ -1148,8 +1145,7 @@
"Id": "4.1.2",
"Description": "Ensure that password hash sync is enabled for hybrid deployments",
"Checks": [
"entra_password_hash_sync_enabled",
"entra_seamless_sso_disabled"
"entra_password_hash_sync_enabled"
],
"Attributes": [
{

View File

@@ -85,7 +85,9 @@ class ImageProvider(Provider):
self.images = images if images is not None else []
self.image_list_file = image_list_file
self.scanners = scanners if scanners is not None else ["vuln", "secret"]
self.scanners = (
scanners if scanners is not None else ["vuln", "secret", "misconfig"]
)
self.image_config_scanners = (
image_config_scanners if image_config_scanners is not None else []
)
@@ -319,6 +321,25 @@ class ImageProvider(Provider):
return parts[0]
return None
@staticmethod
def _is_registry_url(image_uid: str) -> bool:
"""Determine whether an image UID is a registry URL (namespace only).
Bare hostnames like "714274078102.dkr.ecr.eu-west-1.amazonaws.com"
or "myregistry.com:5000" are registry URLs (dots in host, no slash).
Image references like "alpine:3.18" or "nginx" are not.
"""
if "/" not in image_uid:
host_part = image_uid.split(":")[0]
if "." in host_part:
return True
registry_host = ImageProvider._extract_registry(image_uid)
if not registry_host:
return False
repo_and_tag = image_uid[len(registry_host) + 1 :]
return "/" not in repo_and_tag and ":" not in repo_and_tag
def cleanup(self) -> None:
"""Clean up any resources after scanning."""
@@ -850,6 +871,9 @@ class ImageProvider(Provider):
"""
Test connection to container registry by attempting to inspect an image.
For bare registry hostnames (e.g. ECR URLs passed by the API as provider_uid),
uses the OCI catalog endpoint instead of trivy image.
Args:
image: Container image to test
raise_on_exception: Whether to raise exceptions
@@ -868,6 +892,16 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
# Registry URL (bare hostname) → test via OCI catalog
if ImageProvider._is_registry_url(image):
return ImageProvider._test_registry_connection(
registry_url=image,
registry_username=registry_username,
registry_password=registry_password,
registry_token=registry_token,
)
# Image reference → test via trivy
# Build env with registry credentials
env = dict(os.environ)
if registry_username and registry_password:
@@ -928,3 +962,37 @@ class ImageProvider(Provider):
is_connected=False,
error=f"Unexpected error: {str(error)}",
)
@staticmethod
def _test_registry_connection(
registry_url: str,
registry_username: str | None = None,
registry_password: str | None = None,
registry_token: str | None = None,
) -> "Connection":
"""Test connection to a registry URL by listing repositories via OCI catalog."""
try:
adapter = create_registry_adapter(
registry_url=registry_url,
username=registry_username,
password=registry_password,
token=registry_token,
)
adapter.list_repositories()
return Connection(is_connected=True)
except Exception as error:
error_str = str(error).lower()
if "401" in error_str or "unauthorized" in error_str:
return Connection(
is_connected=False,
error="Authentication failed. Check registry credentials.",
)
elif "404" in error_str or "not found" in error_str:
return Connection(
is_connected=False,
error="Registry catalog not found.",
)
return Connection(
is_connected=False,
error=f"Failed to connect to registry: {str(error)[:200]}",
)

View File

@@ -50,9 +50,9 @@ def init_parser(self):
"--scanner",
dest="scanners",
nargs="+",
default=["vuln", "secret"],
default=["vuln", "secret", "misconfig"],
choices=SCANNERS_CHOICES,
help="Trivy scanners to use. Default: vuln, secret. Available: vuln, secret, misconfig, license",
help="Trivy scanners to use. Default: vuln, secret, misconfig. Available: vuln, secret, misconfig, license",
)
scan_config_group.add_argument(

View File

@@ -1,4 +0,0 @@
from prowler.providers.common.provider import Provider
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import DefenderXDR
defenderxdr_client = DefenderXDR(Provider.get_global_provider())

View File

@@ -1,37 +0,0 @@
{
"Provider": "m365",
"CheckID": "defenderxdr_critical_asset_management_pending_approvals",
"CheckTitle": "Ensure all Critical Asset Management classifications are reviewed and approved in Microsoft Defender XDR",
"CheckType": [],
"ServiceName": "defenderxdr",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Defender XDR Critical Asset Management",
"ResourceGroup": "security",
"Description": "Assets with a lower classification confidence score in Microsoft Defender XDR must be approved by a security administrator.\n\nAsset classifications that have not yet been reviewed and approved may result in incomplete **critical asset** visibility.",
"Risk": "Stale pending approvals lead to limited visibility in Microsoft Defender XDR. **Critical assets** that are not properly identified and classified may not receive appropriate security monitoring and protections, creating gaps in the organization's security posture.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/security-exposure-management/classify-critical-assets",
"https://learn.microsoft.com/en-us/security-exposure-management/classify-critical-assets#review-critical-assets"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to **Microsoft Defender** at https://security.microsoft.com/\n2. Go to **Settings** > **Microsoft Defender XDR** > **Critical asset management**\n3. Review each pending approval listed in the check results\n4. Verify the correct classification for each asset\n5. Approve or reject the classification as appropriate",
"Terraform": ""
},
"Recommendation": {
"Text": "Regularly review and approve pending critical asset classifications to ensure accurate asset visibility in Microsoft Defender XDR. Stale approvals reduce the effectiveness of security monitoring and incident response for critical assets.",
"Url": "https://hub.prowler.com/check/defenderxdr_critical_asset_management_pending_approvals"
}
},
"Categories": [
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender XDR with Security Exposure Management enabled. The ThreatHunting.Read.All permission is required to query the ExposureGraphNodes table via the Advanced Hunting API. Approved assets will be reflected in the classification table within 24 hours."
}

View File

@@ -1,86 +0,0 @@
"""Check for pending Critical Asset Management approvals in Defender XDR.
This check identifies asset classifications with low confidence scores
that require security administrator review and approval.
"""
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defenderxdr.defenderxdr_client import (
defenderxdr_client,
)
class defenderxdr_critical_asset_management_pending_approvals(Check):
"""Check for pending Critical Asset Management approvals in Microsoft Defender XDR.
This check queries Advanced Hunting to identify assets with low classification
confidence scores that have not been reviewed by a security administrator.
Prerequisites:
1. ThreatHunting.Read.All permission granted
2. Microsoft Defender XDR with Security Exposure Management enabled
Results:
- PASS: No pending approvals for Critical Asset Management are found.
- FAIL: At least one asset classification has pending approvals.
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the check for pending Critical Asset Management approvals.
Evaluates whether there are any pending Critical Asset Management
approvals that require administrator review.
Returns:
A list of reports containing the result of the check.
"""
findings = []
pending_approvals = defenderxdr_client.pending_cam_approvals
# API call failed - likely missing ThreatHunting.Read.All permission
if pending_approvals is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Critical Asset Management",
resource_id="criticalAssetManagement",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Critical Asset Management status. "
"Verify that ThreatHunting.Read.All permission is granted."
)
findings.append(report)
return findings
if not pending_approvals:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Critical Asset Management",
resource_id="criticalAssetManagement",
)
report.status = "PASS"
report.status_extended = "No pending approvals for Critical Asset Management classifications are found."
findings.append(report)
else:
for approval in pending_approvals:
report = CheckReportM365(
metadata=self.metadata(),
resource=approval,
resource_name=f"CAM Classification: {approval.classification}",
resource_id=f"cam/{approval.classification}",
)
report.status = "FAIL"
assets_summary = ", ".join(approval.assets[:5])
if len(approval.assets) > 5:
assets_summary += f" and {len(approval.assets) - 5} more"
report.status_extended = (
f"Critical Asset Management classification '{approval.classification}' "
f"has {approval.pending_count} asset(s) pending approval: {assets_summary}."
)
findings.append(report)
return findings

View File

@@ -1,39 +0,0 @@
{
"Provider": "m365",
"CheckID": "defenderxdr_endpoint_privileged_user_exposed_credentials",
"CheckTitle": "Privileged users do not have credentials exposed on vulnerable endpoints",
"CheckType": [],
"ServiceName": "defenderxdr",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "critical",
"ResourceType": "Exposure Management",
"ResourceGroup": "security",
"Description": "Privileged users may have authentication artifacts (CLI secrets, cookies, tokens) exposed on endpoints with high risk scores. Microsoft Defender XDR's Security Exposure Management detects when credentials from users with Entra ID privileged roles are present on vulnerable devices.",
"Risk": "Exposed credentials on vulnerable endpoints enable account takeover through stolen tokens or cookies, Conditional Access bypass via primary refresh tokens, lateral movement to sensitive resources, and persistence until tokens are explicitly revoked.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/security-exposure-management/prerequisites",
"https://learn.microsoft.com/en-us/defender-xdr/advanced-hunting-exposuregraphedges-table"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Defender portal at https://security.microsoft.com\n2. Go to Exposure Management > Attack surface > Attack paths\n3. Review the exposed credential findings for privileged users\n4. For each affected device, review the risk and exposure score in Device Inventory\n5. Remediate endpoint vulnerabilities and improve device security posture\n6. Revoke affected user sessions and rotate credentials\n7. Consider implementing Privileged Access Workstations (PAWs) for privileged users",
"Terraform": ""
},
"Recommendation": {
"Text": "Privileged users should only authenticate from secure, hardened devices with low exposure scores. Implement Privileged Access Workstations (PAWs) and enforce device compliance policies through Conditional Access to prevent credential exposure on vulnerable endpoints.",
"Url": "https://hub.prowler.com/check/defenderxdr_endpoint_privileged_user_exposed_credentials"
}
},
"Categories": [
"secrets",
"identity-access",
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender XDR with Security Exposure Management enabled. The ThreatHunting.Read.All permission is required to query the ExposureGraphEdges table via the Advanced Hunting API."
}

View File

@@ -1,148 +0,0 @@
"""Check for exposed credentials of privileged users in Defender XDR.
This check identifies privileged users whose authentication credentials
(CLI secrets, cookies, tokens) are exposed on vulnerable endpoints.
"""
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defenderxdr.defenderxdr_client import (
defenderxdr_client,
)
class defenderxdr_endpoint_privileged_user_exposed_credentials(Check):
"""Check if privileged users have exposed credentials on endpoints.
This check queries Microsoft Defender XDR's ExposureGraphEdges
table via the Advanced Hunting API to identify privileged users whose
authentication artifacts (CLI secrets, user cookies, sensitive tokens)
are exposed on endpoints with high risk or exposure scores.
Prerequisites:
1. ThreatHunting.Read.All permission granted
2. Microsoft Defender for Endpoint (MDE) enabled and deployed on devices
Results:
- PASS: No exposed credentials found OR MDE enabled but no devices
- FAIL: Exposed credentials detected OR MDE not enabled (blind spot)
"""
def execute(self) -> list[CheckReportM365]:
"""Execute the check for exposed credentials of privileged users.
Returns:
List[CheckReportM365]: A list of reports with check results.
"""
findings = []
# Step 1: Check MDE status
mde_status = defenderxdr_client.mde_status
# API call failed - likely missing ThreatHunting.Read.All permission
if mde_status is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeStatus",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Microsoft Defender XDR status. "
"Verify that ThreatHunting.Read.All permission is granted."
)
findings.append(report)
return findings
# MDE not enabled - this is a security blind spot
if mde_status == "not_enabled":
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeStatus",
)
report.status = "FAIL"
report.status_extended = (
"Microsoft Defender for Endpoint is not enabled. "
"Without MDE there is no visibility into credential "
"exposure on endpoints."
)
findings.append(report)
return findings
# MDE enabled but no devices - PASS (no endpoints to evaluate)
if mde_status == "no_devices":
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeDevices",
)
report.status = "PASS"
report.status_extended = (
"Microsoft Defender for Endpoint is enabled but no devices "
"are onboarded. No endpoints to evaluate for credential "
"exposure."
)
findings.append(report)
return findings
# Step 2: MDE is active with devices - check for exposed credentials
exposed_credentials = defenderxdr_client.exposed_credentials_privileged_users
# API call failed for exposed credentials query
if exposed_credentials is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="exposedCredentials",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Security Exposure Management for exposed "
"credentials. Verify that Security Exposure Management "
"is enabled."
)
findings.append(report)
return findings
# Found exposed credentials - report each one
if exposed_credentials:
for exposed_user in exposed_credentials:
report = CheckReportM365(
metadata=self.metadata(),
resource=exposed_user,
resource_name=exposed_user.target_node_name,
resource_id=(exposed_user.target_node_id or exposed_user.edge_id),
)
report.status = "FAIL"
credential_info = (
f" ({exposed_user.credential_type})"
if exposed_user.credential_type
else ""
)
report.status_extended = (
f"Privileged user {exposed_user.target_node_name} has "
f"exposed credentials{credential_info} on device "
f"{exposed_user.source_node_name}."
)
findings.append(report)
else:
# No exposed credentials found - full visibility, no risk detected
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR Exposure Management",
resource_id="exposedCredentials",
)
report.status = "PASS"
report.status_extended = (
"No exposed credentials found for privileged users on "
"vulnerable endpoints."
)
findings.append(report)
return findings

View File

@@ -1,322 +0,0 @@
"""Microsoft Defender XDR service module.
This module provides access to Microsoft Defender XDR data
through the Microsoft Graph Security Advanced Hunting API.
"""
import asyncio
import json
from typing import Dict, List, Optional
from msgraph.generated.security.microsoft_graph_security_run_hunting_query.run_hunting_query_post_request_body import (
RunHuntingQueryPostRequestBody,
)
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
class DefenderXDR(M365Service):
"""Microsoft Defender XDR service class.
Provides access to Microsoft Defender XDR data through
the Microsoft Graph Security Advanced Hunting API.
This class handles endpoint security checks including:
- Device security posture
- Exposed credentials detection
- Vulnerability assessments
- Critical Asset Management approvals
Attributes:
mde_status: Status of MDE deployment
(None, "not_enabled", "no_devices", "active")
exposed_credentials_privileged_users: List of privileged users
with exposed credentials
pending_cam_approvals: List of pending Critical Asset Management
approvals (None if API error)
"""
def __init__(self, provider: M365Provider):
"""Initialize the DefenderXDR service client.
Args:
provider: The M365Provider instance for authentication.
"""
super().__init__(provider)
# MDE status: None = API error, "not_enabled" = table not found,
# "no_devices" = enabled but empty, "active" = has devices
self.mde_status: Optional[str] = None
# Check data
self.exposed_credentials_privileged_users: Optional[
List[ExposedCredentialPrivilegedUser]
] = []
self.pending_cam_approvals: Optional[List[PendingCAMApproval]] = []
loop = self._get_event_loop()
try:
(
self.mde_status,
self.exposed_credentials_privileged_users,
self.pending_cam_approvals,
) = loop.run_until_complete(
asyncio.gather(
self._check_mde_status(),
self._get_exposed_credentials_privileged_users(),
self._get_pending_cam_approvals(),
)
)
finally:
self._cleanup_event_loop(loop)
def _get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Get or create an event loop for async operations."""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
raise RuntimeError(
"Cannot initialize DefenderXDR service while event loop is running"
)
return loop
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def _cleanup_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""Clean up the event loop if we created it."""
try:
if loop and not loop.is_running():
asyncio.set_event_loop(None)
loop.close()
except Exception as error:
# Best-effort cleanup: swallow errors but log them for diagnostics
logger.debug(f"DefenderXDR - Failed to clean up event loop: {error}")
async def _run_hunting_query(self, query: str) -> tuple[Optional[List[Dict]], bool]:
"""Execute an Advanced Hunting query using Microsoft Graph Security API.
Args:
query: The KQL (Kusto Query Language) query to execute.
Returns:
Tuple of (results, table_not_found):
- results: List of result dicts, empty list if no results,
None if API error.
- table_not_found: True if query failed because table
doesn't exist.
"""
try:
request_body = RunHuntingQueryPostRequestBody(query=query)
response = await self.client.security.microsoft_graph_security_run_hunting_query.post(
request_body
)
if not response or not response.results:
return [], False
results = [
row.additional_data
for row in response.results
if hasattr(row, "additional_data")
]
return results, False
except Exception as error:
error_message = str(error).lower()
if (
"failed to resolve table" in error_message
or "could not find table" in error_message
):
logger.warning(f"DefenderXDR - Table not found in query: {error}")
return [], True
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None, False
async def _check_mde_status(self) -> Optional[str]:
"""Check Microsoft Defender for Endpoint status.
Returns:
- None: API call failed (permission issue)
- "not_enabled": DeviceInfo table doesn't exist (MDE not enabled)
- "no_devices": MDE enabled but no devices onboarded
- "active": MDE enabled with devices reporting
"""
logger.info("DefenderXDR - Checking MDE status...")
query = "DeviceInfo | summarize DeviceCount = count()"
results, table_not_found = await self._run_hunting_query(query)
if results is None:
return None
if table_not_found:
return "not_enabled"
if results and len(results) > 0:
device_count = results[0].get("DeviceCount", 0)
if device_count > 0:
return "active"
return "no_devices"
async def _get_exposed_credentials_privileged_users(
self,
) -> Optional[List["ExposedCredentialPrivilegedUser"]]:
"""Query for privileged users with exposed credentials.
Returns:
List of ExposedCredentialPrivilegedUser objects,
or None if API call failed.
"""
logger.info(
"DefenderXDR - Querying for exposed credentials of privileged users..."
)
query = """
ExposureGraphEdges
| where EdgeLabel == "hasCredentialsFor"
| where TargetNodeLabel == "user"
| extend targetCategories = parse_json(TargetNodeCategories)
| where targetCategories has "PrivilegedEntraIdRole" or targetCategories has "privileged"
| extend credentialType = tostring(parse_json(EdgeProperties).credentialType)
| project
EdgeId,
SourceNodeId,
SourceNodeName,
SourceNodeLabel,
TargetNodeId,
TargetNodeName,
TargetNodeLabel,
CredentialType = credentialType,
TargetCategories = TargetNodeCategories
"""
results, _ = await self._run_hunting_query(query)
if results is None:
return None
return [self._parse_exposed_credential(row) for row in results if row]
def _parse_exposed_credential(self, row: Dict) -> "ExposedCredentialPrivilegedUser":
"""Parse a single row into an ExposedCredentialPrivilegedUser."""
target_categories = row.get("TargetCategories", [])
if isinstance(target_categories, str):
try:
target_categories = json.loads(target_categories)
except (json.JSONDecodeError, ValueError):
target_categories = []
return ExposedCredentialPrivilegedUser(
edge_id=str(row.get("EdgeId", "")),
source_node_id=str(row.get("SourceNodeId", "")),
source_node_name=str(row.get("SourceNodeName", "Unknown")),
source_node_label=str(row.get("SourceNodeLabel", "")),
target_node_id=str(row.get("TargetNodeId", "")),
target_node_name=str(row.get("TargetNodeName", "Unknown")),
target_node_label=str(row.get("TargetNodeLabel", "")),
credential_type=str(row.get("CredentialType") or "Unknown"),
target_categories=target_categories,
)
async def _get_pending_cam_approvals(
self,
) -> Optional[List["PendingCAMApproval"]]:
"""Query for pending Critical Asset Management approvals.
Queries the ExposureGraphNodes table to find assets with low criticality
confidence scores that require administrator approval.
Returns:
List of PendingCAMApproval objects, or None if API call failed.
"""
logger.info(
"DefenderXDR - Querying for pending Critical Asset Management approvals..."
)
query = """
ExposureGraphNodes
| where isnotempty(parse_json(NodeProperties)['rawData']['criticalityConfidenceLow'])
| mv-expand parse_json(NodeProperties)['rawData']['criticalityConfidenceLow']
| extend Classification = tostring(NodeProperties_rawData_criticalityConfidenceLow)
| summarize PendingApproval = count(), Assets = array_sort_asc(make_set(NodeName)) by Classification
| sort by Classification asc
"""
results, _ = await self._run_hunting_query(query)
if results is None:
return None
pending_approvals = []
for row in results:
if not row:
continue
classification = row.get("Classification", "")
pending_count = int(row.get("PendingApproval", 0))
assets_raw = row.get("Assets", "[]")
if isinstance(assets_raw, str):
try:
assets = json.loads(assets_raw)
except (json.JSONDecodeError, ValueError):
assets = []
elif isinstance(assets_raw, list):
assets = assets_raw
else:
assets = []
pending_approvals.append(
PendingCAMApproval(
classification=classification,
pending_count=pending_count,
assets=assets,
)
)
return pending_approvals
class ExposedCredentialPrivilegedUser(BaseModel):
"""Model for exposed credential data of a privileged user.
Represents authentication credentials (CLI secrets, user cookies, tokens)
of privileged users that are exposed on vulnerable endpoints.
"""
edge_id: str
source_node_id: str
source_node_name: str
source_node_label: str
target_node_id: str
target_node_name: str
target_node_label: str
credential_type: Optional[str] = None
target_categories: list = []
class PendingCAMApproval(BaseModel):
"""Model for a pending Critical Asset Management approval classification.
Represents assets with low criticality confidence scores that require
security administrator review and approval.
Attributes:
classification: The asset classification name pending approval.
pending_count: The number of assets pending approval for this classification.
assets: List of asset names pending approval.
"""
classification: str
pending_count: int
assets: List[str]

View File

@@ -1,37 +0,0 @@
{
"Provider": "m365",
"CheckID": "entra_app_registration_no_unused_privileged_permissions",
"CheckTitle": "App registration has no unused privileged API permissions",
"CheckType": [],
"ServiceName": "entra",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "App Registration",
"ResourceGroup": "IAM",
"Description": "OAuth app registrations with privileged API permissions (High privilege level) that are not being actively used. Usage status is determined by Microsoft Defender for Cloud Apps App Governance.",
"Risk": "Unused privileged permissions expand the attack surface. If a compromised app has dormant privileged permissions, attackers can exploit them for **privilege escalation**, **unauthorized access** to sensitive data, or **lateral movement** within the environment.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/defender-cloud-apps/app-governance-visibility-insights-overview",
"https://learn.microsoft.com/en-us/defender-xdr/advanced-hunting-oauthappinfo-table"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Defender XDR portal (https://security.microsoft.com)\n2. Go to Cloud apps > App governance > Overview\n3. Review the Applications inventory for apps with unused permissions\n4. For each flagged app, view details and navigate to the Permissions tab\n5. Remove unnecessary permissions via Microsoft Entra admin center",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply the **principle of least privilege** by regularly reviewing and revoking unused privileged permissions from app registrations. Use Microsoft Defender for Cloud Apps App Governance to monitor permission usage.",
"Url": "https://hub.prowler.com/check/entra_app_registration_no_unused_privileged_permissions"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender for Cloud Apps with App Governance enabled and ThreatHunting.Read.All permission. If App Governance data is unavailable, the check fails due to missing visibility."
}

View File

@@ -1,145 +0,0 @@
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.entra.entra_client import entra_client
class entra_app_registration_no_unused_privileged_permissions(Check):
"""
Ensure that app registrations do not have unused privileged API permissions.
This check evaluates OAuth applications registered in Microsoft Entra ID to identify
those with privileged API permissions (High privilege level or Control/Management Plane
classifications) that are assigned but not actively being used.
The check uses data from Microsoft Defender for Cloud Apps App Governance via
the OAuthAppInfo table in Defender XDR Advanced Hunting.
- PASS: The app has no unused privileged permissions.
- FAIL: The app has one or more unused privileged permissions that should be revoked.
It also fails when OAuth App Governance data is not available.
"""
# InUse field values from OAuthAppInfo:
# - "true" / "1" / "True" = permission is actively used
# - "false" / "0" / "False" = permission is NOT used (this triggers FAIL)
# - "Not supported" = Microsoft cannot determine usage
# - "" (empty) = No tracking data available
# Note: Microsoft is changing from numeric (1/0) to textual (True/False) on Feb 25, 2026
_UNUSED_STATUSES = {"false", "0", "notinuse", "not in use"}
_PRIVILEGED_PLANE_LABELS = ("control plane", "management plane")
def execute(self) -> list[CheckReportM365]:
"""
Execute the unused privileged permissions check for app registrations.
Iterates over OAuth applications retrieved from the Entra client and generates
reports indicating whether each app has unused privileged permissions.
Returns:
list[CheckReportM365]: A list of reports with the result of the check for each app.
"""
findings = []
# If OAuth app data is None, the API call failed (missing permissions or App Governance not enabled)
if entra_client.oauth_apps is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="OAuth Applications",
resource_id="oauthApps",
)
report.status = "FAIL"
report.status_extended = (
"OAuth App Governance data is unavailable. "
"Enable App Governance in Microsoft Defender for Cloud Apps and "
"grant ThreatHunting.Read.All to evaluate unused privileged permissions."
)
findings.append(report)
return findings
# If OAuth apps is empty dict, no apps are registered - this is compliant
if not entra_client.oauth_apps:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="OAuth Applications",
resource_id="oauthApps",
)
report.status = "PASS"
report.status_extended = (
"No OAuth applications are registered in the tenant."
)
findings.append(report)
return findings
# Check each OAuth app for unused privileged permissions
for app_id, app in entra_client.oauth_apps.items():
report = CheckReportM365(
metadata=self.metadata(),
resource=app,
resource_name=app.name,
resource_id=app_id,
)
# Find unused privileged permissions
# A permission is considered privileged if it has:
# - PrivilegeLevel == "High"
# Or if it's part of Control Plane / Management Plane (typically High privilege)
unused_privileged_permissions = []
for permission in app.permissions:
# Check if the permission is privileged
is_privileged = self._is_privileged_permission(permission)
# Check if the permission is unused
normalized_usage = self._normalize(permission.usage_status)
is_unused = normalized_usage in self._UNUSED_STATUSES
if is_privileged and is_unused:
unused_privileged_permissions.append(permission.name)
if unused_privileged_permissions:
# The app has unused privileged permissions
report.status = "FAIL"
# Truncate list to first 5 permissions for readability
total_count = len(unused_privileged_permissions)
if total_count > 5:
displayed = unused_privileged_permissions[:5]
permissions_list = ", ".join(displayed)
remaining = total_count - 5
permissions_list += f" (and {remaining} more)"
else:
permissions_list = ", ".join(unused_privileged_permissions)
report.status_extended = (
f"App registration {app.name} has {total_count} "
f"unused privileged permission(s): {permissions_list}."
)
else:
# The app has no unused privileged permissions
report.status = "PASS"
report.status_extended = (
f"App registration {app.name} has no unused privileged permissions."
)
findings.append(report)
return findings
@classmethod
def _is_privileged_permission(cls, permission) -> bool:
privilege_level = cls._normalize(permission.privilege_level)
permission_type = cls._normalize(permission.permission_type)
classification = cls._normalize(getattr(permission, "classification", ""))
if privilege_level == "high":
return True
return any(
label in permission_type or label in classification
for label in cls._PRIVILEGED_PLANE_LABELS
)
@staticmethod
def _normalize(value: str) -> str:
return (
value.lower().replace("_", " ").replace("-", " ").strip() if value else ""
)

View File

@@ -1,38 +0,0 @@
{
"Provider": "m365",
"CheckID": "entra_seamless_sso_disabled",
"CheckTitle": "Entra hybrid deployment does not have Seamless SSO enabled",
"CheckType": [],
"ServiceName": "entra",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Directory Sync Settings",
"ResourceGroup": "IAM",
"Description": "**Seamless Single Sign-On (SSO)** in hybrid Microsoft Entra deployments allows automatic authentication for domain-joined devices on the corporate network.\n\nThis check verifies the actual Seamless SSO configuration in directory synchronization settings. Modern devices with **Primary Refresh Token** (PRT) support no longer require Seamless SSO.",
"Risk": "Seamless SSO can be exploited for **lateral movement** between on-premises domains and Entra ID when an Entra Connect server is compromised. It can also be used to perform **brute force attacks** against Entra ID, as authentication through the AZUREADSSOACC account bypasses standard protections.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/entra/identity/hybrid/connect/how-to-connect-sso"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Open Microsoft Entra Connect configuration tool on the on-premises server.\n2. Navigate to **Change User Sign In**.\n3. Uncheck **Enable single sign-on**.\n4. Complete the configuration wizard.\n5. In Active Directory, run `Get-AzureADSSOStatus` to verify Seamless SSO shows `\"enable\":false`.\n6. Run `Disable-AzureADSSOForest` with domain admin credentials to remove the AZUREADSSOACC account.",
"Terraform": ""
},
"Recommendation": {
"Text": "Disable **Seamless SSO** in hybrid environments where modern devices support *Primary Refresh Token (PRT)*. Regularly audit Entra Connect settings and verify that the AZUREADSSOACC computer account is removed from Active Directory.",
"Url": "https://hub.prowler.com/check/entra_seamless_sso_disabled"
}
},
"Categories": [
"e3"
],
"DependsOn": [],
"RelatedTo": [
"entra_password_hash_sync_enabled"
],
"Notes": "Applies only to hybrid Microsoft Entra deployments using Entra Connect sync. The check reads the seamless_sso_enabled flag from the directory on-premises synchronization settings via Microsoft Graph API."
}

View File

@@ -1,82 +0,0 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.entra.entra_client import entra_client
class entra_seamless_sso_disabled(Check):
"""Check that Seamless Single Sign-On (SSO) is disabled for Microsoft Entra hybrid deployments.
Seamless SSO allows users to sign in without typing their passwords when on
corporate devices connected to the corporate network. When an Entra Connect server
is compromised, Seamless SSO can enable lateral movement between on-premises domains
and Entra ID, and it can also be exploited for brute force attacks. Modern devices with
Primary Refresh Token (PRT) support make this feature unnecessary for most organizations.
- PASS: Seamless SSO is disabled or on-premises sync is not enabled (cloud-only).
- FAIL: Seamless SSO is enabled in a hybrid deployment, or cannot verify due to insufficient permissions.
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the Seamless SSO disabled check.
Checks the directory sync settings to determine if Seamless SSO is enabled.
For hybrid environments, this check verifies the actual Seamless SSO configuration
rather than inferring from on-premises sync status.
Returns:
A list of CheckReportM365 objects with the result of the check.
"""
findings = []
# Check if there was an error retrieving directory sync settings
if entra_client.directory_sync_error:
for organization in entra_client.organizations:
report = CheckReportM365(
self.metadata(),
resource=organization,
resource_id=organization.id,
resource_name=organization.name,
)
# Only FAIL for hybrid orgs; cloud-only orgs don't need this permission
if organization.on_premises_sync_enabled:
report.status = "FAIL"
report.status_extended = f"Cannot verify Seamless SSO status for {organization.name}: {entra_client.directory_sync_error}."
else:
report.status = "PASS"
report.status_extended = f"Entra organization {organization.name} is cloud-only (no on-premises sync), Seamless SSO is not applicable."
findings.append(report)
return findings
# Process directory sync settings if available
for sync_settings in entra_client.directory_sync_settings:
report = CheckReportM365(
self.metadata(),
resource=sync_settings,
resource_id=sync_settings.id,
resource_name=f"Directory Sync {sync_settings.id}",
)
if sync_settings.seamless_sso_enabled:
report.status = "FAIL"
report.status_extended = f"Entra directory sync {sync_settings.id} has Seamless SSO enabled, which can be exploited for lateral movement and brute force attacks."
else:
report.status = "PASS"
report.status_extended = f"Entra directory sync {sync_settings.id} has Seamless SSO disabled."
findings.append(report)
# If no directory sync settings and no error, it's a cloud-only tenant
if not entra_client.directory_sync_settings:
for organization in entra_client.organizations:
report = CheckReportM365(
self.metadata(),
resource=organization,
resource_id=organization.id,
resource_name=organization.name,
)
report.status = "PASS"
report.status_extended = f"Entra organization {organization.name} is cloud-only (no on-premises sync), Seamless SSO is not applicable."
findings.append(report)
return findings

View File

@@ -1,14 +1,9 @@
import asyncio
import json
from asyncio import gather
from enum import Enum
from typing import Dict, List, Optional
from typing import List, Optional
from uuid import UUID
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
from msgraph.generated.security.microsoft_graph_security_run_hunting_query.run_hunting_query_post_request_body import (
RunHuntingQueryPostRequestBody,
)
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
@@ -17,33 +12,7 @@ from prowler.providers.m365.m365_provider import M365Provider
class Entra(M365Service):
"""
Microsoft Entra ID service class.
This class provides methods to retrieve and manage Microsoft Entra ID
security policies and configurations, including authorization policies,
conditional access policies, admin consent policies, groups, organizations,
users, and OAuth application data from Defender XDR.
Attributes:
tenant_domain (str): The tenant domain.
authorization_policy (AuthorizationPolicy): The authorization policy.
conditional_access_policies (dict): Dictionary of conditional access policies.
admin_consent_policy (AdminConsentPolicy): The admin consent policy.
groups (list): List of groups.
organizations (list): List of organizations.
users (dict): Dictionary of users.
user_accounts_status (dict): Dictionary of user account statuses.
oauth_apps (dict): Dictionary of OAuth applications from Defender XDR.
"""
def __init__(self, provider: M365Provider):
"""
Initialize the Entra service client.
Args:
provider: The M365Provider instance for authentication and configuration.
"""
super().__init__(provider)
if self.powershell:
@@ -78,8 +47,6 @@ class Entra(M365Service):
self._get_groups(),
self._get_organization(),
self._get_users(),
self._get_oauth_apps(),
self._get_directory_sync_settings(),
)
)
@@ -89,8 +56,6 @@ class Entra(M365Service):
self.groups = attributes[3]
self.organizations = attributes[4]
self.users = attributes[5]
self.oauth_apps: Optional[Dict[str, OAuthApp]] = attributes[6]
self.directory_sync_settings, self.directory_sync_error = attributes[7]
self.user_accounts_status = {}
if created_loop:
@@ -414,57 +379,6 @@ class Entra(M365Service):
return organizations
async def _get_directory_sync_settings(self):
"""Retrieve on-premises directory synchronization settings.
Fetches the directory synchronization configuration from Microsoft Graph API
to determine the state of synchronization features such as password sync,
device writeback, and other hybrid identity settings.
Returns:
A tuple containing:
- A list of DirectorySyncSettings objects, or an empty list if retrieval fails.
- An error message string if there was an access error, None otherwise.
"""
logger.info("Entra - Getting directory sync settings...")
directory_sync_settings = []
error_message = None
try:
sync_data = await self.client.directory.on_premises_synchronization.get()
for sync in getattr(sync_data, "value", []) or []:
features = getattr(sync, "features", None)
directory_sync_settings.append(
DirectorySyncSettings(
id=sync.id,
password_sync_enabled=getattr(
features, "password_sync_enabled", False
)
or False,
seamless_sso_enabled=getattr(
features, "seamless_sso_enabled", False
)
or False,
)
)
except ODataError as error:
error_code = getattr(error.error, "code", None) if error.error else None
if error_code == "Authorization_RequestDenied":
error_message = "Insufficient privileges to read directory sync settings. Required permission: OnPremDirectorySynchronization.Read.All or OnPremDirectorySynchronization.ReadWrite.All"
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error_message}"
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
error_message = str(error)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
error_message = str(error)
return directory_sync_settings, error_message
async def _get_users(self):
logger.info("Entra - Getting users...")
users = {}
@@ -547,122 +461,6 @@ class Entra(M365Service):
return registration_details
async def _get_oauth_apps(self) -> Optional[Dict[str, "OAuthApp"]]:
"""
Retrieve OAuth applications from Defender XDR using Advanced Hunting.
This method queries the OAuthAppInfo table to get information about
OAuth applications registered in the tenant, including their permissions
and usage status.
Returns:
Optional[Dict[str, OAuthApp]]: Dictionary of OAuth applications keyed by app ID,
or None if the API call failed (missing permissions or App Governance not enabled).
"""
logger.info("Entra - Getting OAuth apps from Defender XDR...")
oauth_apps: Optional[Dict[str, OAuthApp]] = {}
try:
# Query the OAuthAppInfo table using Advanced Hunting
# The query gets apps with their permissions including usage status
query = """
OAuthAppInfo
| project OAuthAppId, AppName, AppStatus, PrivilegeLevel, Permissions,
ServicePrincipalId, IsAdminConsented, LastUsedTime, AppOrigin
"""
request_body = RunHuntingQueryPostRequestBody(query=query)
result = await self.client.security.microsoft_graph_security_run_hunting_query.post(
request_body
)
if result and result.results:
for row in result.results:
row_data = row.additional_data
raw_app_id = row_data.get("OAuthAppId", "")
# Convert to string in case API returns non-string type
app_id = str(raw_app_id) if raw_app_id else ""
if not app_id:
continue
# Parse the permissions array
# Permissions can be a list of JSON strings or a list of dicts
permissions = []
raw_permissions = row_data.get("Permissions", [])
if raw_permissions:
for perm in raw_permissions:
# Parse JSON string if needed
if isinstance(perm, str):
try:
perm = json.loads(perm)
except json.JSONDecodeError:
continue
if isinstance(perm, dict):
permissions.append(
OAuthAppPermission(
name=str(perm.get("PermissionValue", "")),
target_app_id=str(perm.get("TargetAppId", "")),
target_app_name=str(
perm.get("TargetAppDisplayName", "")
),
permission_type=str(
perm.get("PermissionType", "")
),
classification=str(
perm.get(
"Classification",
perm.get(
"PermissionClassification", ""
),
)
),
privilege_level=str(
perm.get("PrivilegeLevel", "")
),
usage_status=str(perm.get("InUse", "")),
)
)
# Convert values to strings to handle API returning non-string types
raw_service_principal_id = row_data.get("ServicePrincipalId", "")
service_principal_id = (
str(raw_service_principal_id)
if raw_service_principal_id
else ""
)
raw_last_used_time = row_data.get("LastUsedTime")
last_used_time = (
str(raw_last_used_time)
if raw_last_used_time is not None
else None
)
oauth_apps[app_id] = OAuthApp(
id=app_id,
name=str(row_data.get("AppName", "")),
status=str(row_data.get("AppStatus", "")),
privilege_level=str(row_data.get("PrivilegeLevel", "")),
permissions=permissions,
service_principal_id=service_principal_id,
is_admin_consented=bool(
row_data.get("IsAdminConsented", False)
),
last_used_time=last_used_time,
app_origin=str(row_data.get("AppOrigin", "")),
)
except Exception as error:
# Log the error and return None to indicate API failure
# This API requires ThreatHunting.Read.All permission and App Governance to be enabled
logger.warning(
f"Entra - Could not retrieve OAuth apps from Defender XDR. "
f"This requires ThreatHunting.Read.All permission and App Governance enabled. "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
return oauth_apps
class ConditionalAccessPolicyState(Enum):
ENABLED = "enabled"
@@ -801,19 +599,6 @@ class Organization(BaseModel):
on_premises_sync_enabled: bool
class DirectorySyncSettings(BaseModel):
"""On-premises directory synchronization settings.
Represents the synchronization configuration for a tenant, including feature
flags that control hybrid identity behaviors such as password synchronization
and Seamless SSO.
"""
id: str
password_sync_enabled: bool = False
seamless_sso_enabled: bool = False
class Group(BaseModel):
id: str
name: str
@@ -866,53 +651,3 @@ class AuthPolicyRoles(Enum):
USER = UUID("a0b1b346-4d3e-4e8b-98f8-753987be4970")
GUEST_USER = UUID("10dae51f-b6af-4016-8d66-8c2a99b929b3")
GUEST_USER_ACCESS_RESTRICTED = UUID("2af84b1e-32c8-42b7-82bc-daa82404023b")
class OAuthAppPermission(BaseModel):
"""
Model for OAuth application permission.
Attributes:
name: The permission name.
target_app_id: The target application ID that provides this permission.
target_app_name: The target application display name.
permission_type: The type of permission (Application or Delegated).
classification: Optional plane classification (e.g. Control Plane, Management Plane).
privilege_level: The privilege level (High, Medium, Low).
usage_status: The usage status (InUse or NotInUse).
"""
name: str
target_app_id: str = ""
target_app_name: str = ""
permission_type: str = ""
classification: str = ""
privilege_level: str = ""
usage_status: str = ""
class OAuthApp(BaseModel):
"""
Model for OAuth application from Defender XDR.
Attributes:
id: The application ID.
name: The application display name.
status: The application status (Enabled, Disabled, etc.).
privilege_level: The overall privilege level of the app.
permissions: List of permissions assigned to the app.
service_principal_id: The service principal ID.
is_admin_consented: Whether the app has admin consent.
last_used_time: When the app was last used.
app_origin: Whether the app is internal or external.
"""
id: str
name: str
status: str = ""
privilege_level: str = ""
permissions: List[OAuthAppPermission] = []
service_principal_id: str = ""
is_admin_consented: bool = False
last_used_time: Optional[str] = None
app_origin: str = ""

View File

@@ -53,7 +53,7 @@ class TestImageProvider:
assert provider._type == "image"
assert provider.type == "image"
assert provider.images == ["alpine:3.18"]
assert provider.scanners == ["vuln", "secret"]
assert provider.scanners == ["vuln", "secret", "misconfig"]
assert provider.image_config_scanners == []
assert provider.trivy_severity == []
assert provider.ignore_unfixed is False
@@ -616,6 +616,99 @@ class TestExtractRegistry:
assert ImageProvider._extract_registry("nginx") is None
class TestIsRegistryUrl:
def test_bare_ecr_hostname(self):
assert ImageProvider._is_registry_url(
"714274078102.dkr.ecr.eu-west-1.amazonaws.com"
)
def test_bare_hostname_with_port(self):
assert ImageProvider._is_registry_url("myregistry.com:5000")
def test_bare_ghcr(self):
assert ImageProvider._is_registry_url("ghcr.io")
def test_registry_with_namespace_only(self):
"""Registry URL with a single path segment (no tag) is a registry URL."""
assert ImageProvider._is_registry_url("ghcr.io/myorg")
def test_image_reference_not_registry(self):
"""Full image reference with repo and tag is not a registry URL."""
assert not ImageProvider._is_registry_url("ghcr.io/myorg/repo:tag")
def test_simple_image_name(self):
assert not ImageProvider._is_registry_url("alpine:3.18")
def test_bare_image_no_tag(self):
assert not ImageProvider._is_registry_url("nginx")
def test_dockerhub_namespace(self):
assert not ImageProvider._is_registry_url("library/alpine")
class TestTestRegistryConnection:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_connection_success(self, mock_factory):
"""Test that a bare hostname triggers registry catalog test."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["repo1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="714274078102.dkr.ecr.eu-west-1.amazonaws.com",
registry_username="user",
registry_password="pass",
)
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="714274078102.dkr.ecr.eu-west-1.amazonaws.com",
username="user",
password="pass",
token=None,
)
mock_adapter.list_repositories.assert_called_once()
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_connection_auth_failure(self, mock_factory):
"""Test that 401 from registry adapter returns auth failure."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.side_effect = Exception("401 unauthorized")
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="714274078102.dkr.ecr.eu-west-1.amazonaws.com",
)
assert result.is_connected is False
assert "Authentication failed" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_connection_generic_error(self, mock_factory):
"""Test that a generic error from registry adapter returns error message."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.side_effect = Exception("connection refused")
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="myregistry.example.com",
)
assert result.is_connected is False
assert "Failed to connect to registry" in result.error
@patch("subprocess.run")
def test_image_reference_still_uses_trivy(self, mock_subprocess):
"""Test that a full image reference still uses trivy (not registry catalog)."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(image="alpine:3.18")
assert result.is_connected is True
assert mock_subprocess.call_count == 1
assert mock_subprocess.call_args.args[0][0] == "trivy"
class TestTrivyAuthIntegration:
@patch("subprocess.run")
def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):

View File

@@ -1,218 +0,0 @@
from unittest import mock
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
PendingCAMApproval,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defenderxdr_critical_asset_management_pending_approvals:
"""Tests for the defenderxdr_critical_asset_management_pending_approvals check."""
def test_api_failed_missing_permission(self):
"""Test FAIL when API call fails (None): missing ThreatHunting.Read.All permission."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Unable to query Critical Asset Management" in result[0].status_extended
)
assert "ThreatHunting.Read.All" in result[0].status_extended
assert result[0].resource_id == "criticalAssetManagement"
def test_no_pending_approvals_pass(self):
"""Test PASS scenario when there are no pending CAM approvals."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No pending approvals for Critical Asset Management classifications are found."
)
assert result[0].resource_name == "Critical Asset Management"
assert result[0].resource_id == "criticalAssetManagement"
assert result[0].resource == {}
def test_single_pending_approval_fail(self):
"""Test FAIL scenario when there is one pending CAM approval."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=2,
assets=["server-01", "server-02"],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 2 asset(s) pending approval: server-01, server-02."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"
assert (
result[0].resource == defenderxdr_client.pending_cam_approvals[0].dict()
)
def test_multiple_pending_approvals_fail(self):
"""Test FAIL scenario when there are multiple pending CAM approvals."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=1,
assets=["server-01"],
),
PendingCAMApproval(
classification="Critical",
pending_count=3,
assets=["db-01", "db-02", "db-03"],
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 1 asset(s) pending approval: server-01."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "Critical Asset Management classification 'Critical' has 3 asset(s) pending approval: db-01, db-02, db-03."
)
assert result[1].resource_name == "CAM Classification: Critical"
assert result[1].resource_id == "cam/Critical"
def test_pending_approval_with_more_than_five_assets_fail(self):
"""Test FAIL scenario with more than 5 assets to verify truncation."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=7,
assets=[
"server-01",
"server-02",
"server-03",
"server-04",
"server-05",
"server-06",
"server-07",
],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 7 asset(s) pending approval: server-01, server-02, server-03, server-04, server-05 and 2 more."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"

View File

@@ -1,375 +0,0 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defenderxdr_endpoint_privileged_user_exposed_credentials:
"""Tests for the defenderxdr_endpoint_privileged_user_exposed_credentials check."""
def test_mde_status_api_failed(self):
"""Test FAIL when MDE status API call fails (None): missing permission."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = None
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Unable to query Microsoft Defender XDR" in result[0].status_extended
assert "ThreatHunting.Read.All" in result[0].status_extended
assert result[0].resource_id == "mdeStatus"
def test_mde_not_enabled(self):
"""Test FAIL when MDE is not enabled - security blind spot."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "not_enabled"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Microsoft Defender for Endpoint is not enabled"
in result[0].status_extended
)
assert "no visibility" in result[0].status_extended
assert result[0].resource_id == "mdeStatus"
def test_mde_no_devices(self):
"""Test PASS when MDE is enabled but no devices are onboarded."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "no_devices"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "enabled but no devices are onboarded" in result[0].status_extended
assert "No endpoints to evaluate" in result[0].status_extended
assert result[0].resource_id == "mdeDevices"
def test_exposed_credentials_query_failed(self):
"""Test FAIL when exposed credentials query fails (None)."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
defenderxdr_client.exposed_credentials_privileged_users = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Unable to query Security Exposure Management"
in result[0].status_extended
)
assert result[0].resource_id == "exposedCredentials"
def test_no_exposed_credentials(self):
"""Test PASS when no privileged users have exposed credentials."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
"No exposed credentials found for privileged users"
in result[0].status_extended
)
assert result[0].resource_name == "Defender XDR Exposure Management"
assert result[0].resource_id == "exposedCredentials"
def test_single_exposed_credential_with_credential_type(self):
"""Test FAIL when a privileged user has exposed credentials with type."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="CLI secret",
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "admin@contoso.com" in result[0].status_extended
assert "CLI secret" in result[0].status_extended
assert "WORKSTATION01" in result[0].status_extended
assert result[0].resource_name == "admin@contoso.com"
assert result[0].resource_id == "user-789"
def test_single_exposed_credential_without_credential_type(self):
"""Test FAIL when a privileged user has exposed credentials without type."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type=None,
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "admin@contoso.com" in result[0].status_extended
assert "WORKSTATION01" in result[0].status_extended
assert result[0].resource_name == "admin@contoso.com"
assert result[0].resource_id == "user-789"
def test_multiple_exposed_credentials(self):
"""Test FAIL for multiple privileged users with exposed credentials."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user_1 = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="CLI secret",
target_categories=["PrivilegedEntraIdRole"],
)
exposed_user_2 = ExposedCredentialPrivilegedUser(
edge_id="edge-456",
source_node_id="device-789",
source_node_name="SERVER01",
source_node_label="device",
target_node_id="user-012",
target_node_name="globaladmin@contoso.com",
target_node_label="user",
credential_type="user cookie",
target_categories=["PrivilegedEntraIdRole", "privileged"],
)
defenderxdr_client.exposed_credentials_privileged_users = [
exposed_user_1,
exposed_user_2,
]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].resource_name == "admin@contoso.com"
assert result[1].status == "FAIL"
assert result[1].resource_name == "globaladmin@contoso.com"
def test_exposed_credential_uses_edge_id_when_target_node_id_missing(self):
"""Test that edge_id is used as resource_id when target_node_id is empty."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-fallback-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="sensitive token",
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "edge-fallback-123"
assert result[0].resource_name == "admin@contoso.com"

View File

@@ -1,895 +0,0 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.m365.services.entra.entra_service import (
OAuthApp,
OAuthAppPermission,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_entra_app_registration_no_unused_privileged_permissions:
def test_no_oauth_apps(self):
"""No OAuth apps registered in tenant (empty dict): expected PASS."""
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No OAuth applications are registered in the tenant."
)
assert result[0].resource == {}
assert result[0].resource_name == "OAuth Applications"
assert result[0].resource_id == "oauthApps"
def test_no_oauth_apps_none(self):
"""OAuth apps is None (App Governance not enabled): expected FAIL."""
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = None
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "OAuth App Governance data is unavailable. Enable App Governance in Microsoft Defender for Cloud Apps and grant ThreatHunting.Read.All to evaluate unused privileged permissions."
)
assert result[0].resource == {}
assert result[0].resource_name == "OAuth Applications"
assert result[0].resource_id == "oauthApps"
def test_app_no_permissions(self):
"""App with no permissions: expected PASS."""
app_id = str(uuid4())
app_name = "Test App No Permissions"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="Low",
permissions=[],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_all_permissions_in_use(self):
"""App with all privileged permissions in use: expected PASS."""
app_id = str(uuid4())
app_name = "Test App All In Use"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
OAuthAppPermission(
name="User.Read.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_low_privilege_unused(self):
"""App with unused low privilege permissions (not high): expected PASS."""
app_id = str(uuid4())
app_name = "Test App Low Privilege Unused"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="Low",
permissions=[
OAuthAppPermission(
name="User.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="NotInUse",
),
OAuthAppPermission(
name="openid",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_medium_privilege_unused(self):
"""App with unused medium privilege permissions (not high): expected PASS."""
app_id = str(uuid4())
app_name = "Test App Medium Privilege Unused"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="Medium",
permissions=[
OAuthAppPermission(
name="Files.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Medium",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_one_unused_high_privilege_permission(self):
"""App with one unused high privilege permission: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App One Unused High"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Mail.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_multiple_unused_high_privilege_permissions(self):
"""App with multiple unused high privilege permissions: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App Multiple Unused High"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 3 unused privileged permission(s): Mail.ReadWrite.All, Directory.ReadWrite.All, User.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_more_than_five_unused_high_privilege_permissions(self):
"""App with more than 5 unused high privilege permissions: expected FAIL with truncated list."""
app_id = str(uuid4())
app_name = "Test App Many Unused High"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Group.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Sites.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="RoleManagement.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Application.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 7 unused privileged permission(s): Mail.ReadWrite.All, Directory.ReadWrite.All, User.ReadWrite.All, Group.ReadWrite.All, Sites.ReadWrite.All (and 2 more)."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_unused_with_not_in_use_status(self):
"""App with unused permission using 'not_in_use' status variant: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App NotInUse Variant"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="not_in_use",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time=None,
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Mail.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_multiple_apps_mixed_results(self):
"""Multiple apps with mixed results: one PASS and one FAIL."""
app_id_pass = str(uuid4())
app_name_pass = "Test App Pass"
app_id_fail = str(uuid4())
app_name_fail = "Test App Fail"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id_pass: OAuthApp(
id=app_id_pass,
name=app_name_pass,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
),
app_id_fail: OAuthApp(
id=app_id_fail,
name=app_name_fail,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="External",
),
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 2
# Find results by app ID
result_pass = next(r for r in result if r.resource_id == app_id_pass)
result_fail = next(r for r in result if r.resource_id == app_id_fail)
assert result_pass.status == "PASS"
assert (
result_pass.status_extended
== f"App registration {app_name_pass} has no unused privileged permissions."
)
assert result_pass.resource_name == app_name_pass
assert result_fail.status == "FAIL"
assert (
result_fail.status_extended
== f"App registration {app_name_fail} has 1 unused privileged permission(s): Directory.ReadWrite.All."
)
assert result_fail.resource_name == app_name_fail
def test_app_mixed_privilege_levels_unused(self):
"""App with mixed privilege levels (High and Low) unused: only High triggers FAIL."""
app_id = str(uuid4())
app_name = "Test App Mixed Privileges"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Files.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Medium",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
# Only the High privilege permission should be reported
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Mail.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_high_privilege_in_use_and_unused(self):
"""App with some high privilege permissions in use and some unused: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App Partial Usage"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Directory.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_without_name_uses_id(self):
"""App without a name should use app_id as resource_name."""
app_id = str(uuid4())
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name="",
status="Enabled",
privilege_level="Low",
permissions=[],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == ""
assert result[0].resource_id == app_id

View File

@@ -1,274 +0,0 @@
from unittest import mock
from prowler.providers.m365.services.entra.entra_service import (
DirectorySyncSettings,
Organization,
)
from tests.providers.m365.m365_fixtures import set_mocked_m365_provider
class Test_entra_seamless_sso_disabled:
def test_seamless_sso_disabled(self):
"""Test PASS when Seamless SSO is disabled in directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
sync_settings = DirectorySyncSettings(
id="sync-001",
password_sync_enabled=True,
seamless_sso_enabled=False,
)
entra_client.directory_sync_settings = [sync_settings]
entra_client.directory_sync_error = None
entra_client.organizations = []
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Entra directory sync sync-001 has Seamless SSO disabled."
)
assert result[0].resource_id == "sync-001"
assert result[0].resource_name == "Directory Sync sync-001"
assert result[0].location == "global"
def test_seamless_sso_enabled(self):
"""Test FAIL when Seamless SSO is enabled in directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
sync_settings = DirectorySyncSettings(
id="sync-001",
password_sync_enabled=True,
seamless_sso_enabled=True,
)
entra_client.directory_sync_settings = [sync_settings]
entra_client.directory_sync_error = None
entra_client.organizations = []
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Entra directory sync sync-001 has Seamless SSO enabled, which can be exploited for lateral movement and brute force attacks."
)
assert result[0].resource_id == "sync-001"
assert result[0].resource_name == "Directory Sync sync-001"
assert result[0].location == "global"
def test_multiple_sync_settings_mixed(self):
"""Test mixed results with multiple directory sync configurations."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
sync_settings_1 = DirectorySyncSettings(
id="sync-001",
password_sync_enabled=True,
seamless_sso_enabled=True,
)
sync_settings_2 = DirectorySyncSettings(
id="sync-002",
password_sync_enabled=True,
seamless_sso_enabled=False,
)
entra_client.directory_sync_settings = [sync_settings_1, sync_settings_2]
entra_client.directory_sync_error = None
entra_client.organizations = []
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].resource_id == "sync-001"
assert result[1].status == "PASS"
assert result[1].resource_id == "sync-002"
def test_cloud_only_no_sync_settings(self):
"""Test PASS for cloud-only tenant with no directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
org = Organization(
id="org1",
name="Cloud Only Org",
on_premises_sync_enabled=False,
)
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = None
entra_client.organizations = [org]
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Entra organization Cloud Only Org is cloud-only (no on-premises sync), Seamless SSO is not applicable."
)
assert result[0].resource_id == "org1"
assert result[0].resource_name == "Cloud Only Org"
def test_insufficient_permissions_error(self):
"""Test FAIL when there's a permission error reading directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
org = Organization(
id="org1",
name="Prowler Org",
on_premises_sync_enabled=True,
)
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = "Insufficient privileges to read directory sync settings. Required permission: OnPremDirectorySynchronization.Read.All or OnPremDirectorySynchronization.ReadWrite.All"
entra_client.organizations = [org]
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Cannot verify Seamless SSO status" in result[0].status_extended
assert "Insufficient privileges" in result[0].status_extended
assert (
"OnPremDirectorySynchronization.Read.All" in result[0].status_extended
)
assert result[0].resource_id == "org1"
assert result[0].resource_name == "Prowler Org"
def test_insufficient_permissions_cloud_only_passes(self):
"""Test PASS for cloud-only org even when there's a permission error."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
# Cloud-only org (on_premises_sync_enabled=False)
org = Organization(
id="org1",
name="Cloud Only Org",
on_premises_sync_enabled=False,
)
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = (
"Insufficient privileges to read directory sync settings."
)
entra_client.organizations = [org]
check = entra_seamless_sso_disabled()
result = check.execute()
# Should PASS because cloud-only orgs don't need this permission
assert len(result) == 1
assert result[0].status == "PASS"
assert "cloud-only" in result[0].status_extended
assert result[0].resource_id == "org1"
def test_empty_everything(self):
"""Test no findings when both sync settings and organizations are empty."""
entra_client = mock.MagicMock()
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = None
entra_client.organizations = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 0

View File

@@ -6,7 +6,7 @@ All notable changes to the **Prowler UI** are documented in this file.
### 🚀 Added
- OpenStack provider support in the UI [(#10046)](https://github.com/prowler-cloud/prowler/pull/10046)
- Image (Container Registry) provider support in UI: badge icon, credentials form, and provider-type filtering
- PDF report available for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)
- CSV and PDF download buttons in compliance views [(#10093)](https://github.com/prowler-cloud/prowler/pull/10093)

View File

@@ -10,10 +10,10 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "@/components/icons/providers-badge";
import {
@@ -34,10 +34,10 @@ const PROVIDER_ICON: Record<ProviderType, ReactNode> = {
m365: <M365ProviderBadge width={18} height={18} />,
github: <GitHubProviderBadge width={18} height={18} />,
iac: <IacProviderBadge width={18} height={18} />,
image: <ImageProviderBadge width={18} height={18} />,
oraclecloud: <OracleCloudProviderBadge width={18} height={18} />,
mongodbatlas: <MongoDBAtlasProviderBadge width={18} height={18} />,
alibabacloud: <AlibabaCloudProviderBadge width={18} height={18} />,
openstack: <OpenStackProviderBadge width={18} height={18} />,
};
interface AccountsSelectorProps {

View File

@@ -1,7 +1,7 @@
"use client";
import { useSearchParams } from "next/navigation";
import { lazy, Suspense } from "react";
import { type ComponentType, lazy, Suspense } from "react";
import {
MultiSelect,
@@ -48,6 +48,11 @@ const IacProviderBadge = lazy(() =>
default: m.IacProviderBadge,
})),
);
const ImageProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.ImageProviderBadge,
})),
);
const OracleCloudProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.OracleCloudProviderBadge,
@@ -63,11 +68,6 @@ const AlibabaCloudProviderBadge = lazy(() =>
default: m.AlibabaCloudProviderBadge,
})),
);
const OpenStackProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.OpenStackProviderBadge,
})),
);
type IconProps = { width: number; height: number };
@@ -77,7 +77,7 @@ const IconPlaceholder = ({ width, height }: IconProps) => (
const PROVIDER_DATA: Record<
ProviderType,
{ label: string; icon: React.ComponentType<IconProps> }
{ label: string; icon: ComponentType<IconProps> }
> = {
aws: {
label: "Amazon Web Services",
@@ -107,6 +107,10 @@ const PROVIDER_DATA: Record<
label: "Infrastructure as Code",
icon: IacProviderBadge,
},
image: {
label: "Container Registry",
icon: ImageProviderBadge,
},
oraclecloud: {
label: "Oracle Cloud Infrastructure",
icon: OracleCloudProviderBadge,
@@ -119,10 +123,6 @@ const PROVIDER_DATA: Record<
label: "Alibaba Cloud",
icon: AlibabaCloudProviderBadge,
},
openstack: {
label: "OpenStack",
icon: OpenStackProviderBadge,
},
};
type ProviderTypeSelectorProps = {

View File

@@ -5,10 +5,10 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "../icons/providers-badge";
@@ -84,6 +84,15 @@ export const CustomProviderInputIac = () => {
);
};
export const CustomProviderInputImage = () => {
return (
<div className="flex items-center gap-x-2">
<ImageProviderBadge width={25} height={25} />
<p className="text-sm">Container Registry</p>
</div>
);
};
export const CustomProviderInputOracleCloud = () => {
return (
<div className="flex items-center gap-x-2">
@@ -101,12 +110,3 @@ export const CustomProviderInputAlibabaCloud = () => {
</div>
);
};
export const CustomProviderInputOpenStack = () => {
return (
<div className="flex items-center gap-x-2">
<OpenStackProviderBadge width={25} height={25} />
<p className="text-sm">OpenStack</p>
</div>
);
};

View File

@@ -5,10 +5,10 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "@/components/icons/providers-badge";
import { ProviderType } from "@/types";
@@ -21,10 +21,10 @@ export const PROVIDER_ICONS = {
m365: M365ProviderBadge,
github: GitHubProviderBadge,
iac: IacProviderBadge,
image: ImageProviderBadge,
oraclecloud: OracleCloudProviderBadge,
mongodbatlas: MongoDBAtlasProviderBadge,
alibabacloud: AlibabaCloudProviderBadge,
openstack: OpenStackProviderBadge,
} as const;
interface ProviderIconCellProps {

View File

@@ -5,7 +5,7 @@ import { useRouter, useSearchParams } from "next/navigation";
import { useEffect, useState } from "react";
import { Rectangle, ResponsiveContainer, Sankey, Tooltip } from "recharts";
import { PROVIDER_ICONS } from "@/components/icons/providers-badge";
import { PROVIDER_BADGE_BY_NAME } from "@/components/icons/providers-badge";
import { initializeChartColors } from "@/lib/charts/colors";
import { PROVIDER_DISPLAY_NAMES } from "@/types/providers";
import { SEVERITY_FILTER_MAP } from "@/types/severities";
@@ -209,7 +209,7 @@ const CustomNode = ({
}
};
const IconComponent = PROVIDER_ICONS[nodeName];
const IconComponent = PROVIDER_BADGE_BY_NAME[nodeName];
const hasIcon = IconComponent !== undefined;
const iconSize = 24;
const iconGap = 8;
@@ -620,7 +620,8 @@ export function SankeyChart({
</p>
<div className="flex flex-wrap gap-4">
{zeroDataProviders.map((provider) => {
const IconComponent = PROVIDER_ICONS[provider.displayName];
const IconComponent =
PROVIDER_BADGE_BY_NAME[provider.displayName];
return (
<div
key={provider.id}

View File

@@ -0,0 +1,36 @@
import { FC } from "react";
import { IconSvgProps } from "@/types";
export const ImageProviderBadge: FC<IconSvgProps> = ({
size,
width,
height,
...props
}) => (
<svg
xmlns="http://www.w3.org/2000/svg"
aria-hidden="true"
fill="none"
focusable="false"
height={size || height}
role="presentation"
viewBox="0 0 256 256"
width={size || width}
{...props}
>
<rect width="256" height="256" fill="#1c1917" rx="60" />
<g
transform="translate(20, 20) scale(9)"
fill="none"
stroke="#fff"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M12.89 1.45L21 5.75V18.25L12.89 22.55C12.33 22.84 11.67 22.84 11.11 22.55L3 18.25V5.75L11.11 1.45C11.67 1.16 12.33 1.16 12.89 1.45Z" />
<path d="M3.5 6L12 10.5L20.5 6" />
<path d="M12 22.5V10.5" />
</g>
</svg>
);

View File

@@ -8,10 +8,10 @@ import { AzureProviderBadge } from "./azure-provider-badge";
import { GCPProviderBadge } from "./gcp-provider-badge";
import { GitHubProviderBadge } from "./github-provider-badge";
import { IacProviderBadge } from "./iac-provider-badge";
import { ImageProviderBadge } from "./image-provider-badge";
import { KS8ProviderBadge } from "./ks8-provider-badge";
import { M365ProviderBadge } from "./m365-provider-badge";
import { MongoDBAtlasProviderBadge } from "./mongodbatlas-provider-badge";
import { OpenStackProviderBadge } from "./openstack-provider-badge";
import { OracleCloudProviderBadge } from "./oraclecloud-provider-badge";
export {
@@ -21,15 +21,15 @@ export {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
};
// Map provider display names to their icon components
export const PROVIDER_ICONS: Record<string, FC<IconSvgProps>> = {
export const PROVIDER_BADGE_BY_NAME: Record<string, FC<IconSvgProps>> = {
AWS: AWSProviderBadge,
Azure: AzureProviderBadge,
"Google Cloud": GCPProviderBadge,
@@ -37,8 +37,8 @@ export const PROVIDER_ICONS: Record<string, FC<IconSvgProps>> = {
"Microsoft 365": M365ProviderBadge,
GitHub: GitHubProviderBadge,
"Infrastructure as Code": IacProviderBadge,
"Container Registry": ImageProviderBadge,
"Oracle Cloud Infrastructure": OracleCloudProviderBadge,
"MongoDB Atlas": MongoDBAtlasProviderBadge,
"Alibaba Cloud": AlibabaCloudProviderBadge,
OpenStack: OpenStackProviderBadge,
};

View File

@@ -1,29 +0,0 @@
import * as React from "react";
import { IconSvgProps } from "@/types";
export const OpenStackProviderBadge: React.FC<IconSvgProps> = ({
size,
width,
height,
...props
}) => (
<svg
xmlns="http://www.w3.org/2000/svg"
aria-hidden="true"
fill="none"
focusable="false"
height={size || height}
role="presentation"
viewBox="0 0 256 256"
width={size || width}
{...props}
>
<g fill="none">
<rect width="256" height="256" fill="#f4f2ed" rx="60" />
<g transform="translate(48 48) scale(2.5)" fill="#da1a32">
<path d="M58.054.68H5.946C2.676.68 0 3.356 0 6.626V20.64h14.452v-2.3c0-1.776 1.44-3.215 3.215-3.215h28.665c1.776 0 3.215 1.44 3.215 3.215v2.3H64v-14A5.97 5.97 0 0 0 58.054.68zm-8.506 44.97c0 1.776-1.44 3.215-3.215 3.215H17.67c-1.776 0-3.215-1.44-3.215-3.215v-2.3H0v14.013c0 3.27 2.676 5.946 5.946 5.946h52.108c3.27 0 5.946-2.676 5.946-5.946V43.36H49.548zM0 24.773h14.452v14.452H0zm49.548 0H64v14.452H49.548z" />
</g>
</g>
</svg>
);

View File

@@ -15,10 +15,10 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "../icons/providers-badge";
import { FormMessage } from "../ui/form";
@@ -64,6 +64,11 @@ const PROVIDERS = [
label: "Infrastructure as Code",
badge: IacProviderBadge,
},
{
value: "image",
label: "Container Registry",
badge: ImageProviderBadge,
},
{
value: "oraclecloud",
label: "Oracle Cloud Infrastructure",
@@ -74,11 +79,6 @@ const PROVIDERS = [
label: "Alibaba Cloud",
badge: AlibabaCloudProviderBadge,
},
{
value: "openstack",
label: "OpenStack",
badge: OpenStackProviderBadge,
},
] as const;
interface RadioGroupProviderProps {

View File

@@ -20,12 +20,12 @@ import {
GCPDefaultCredentials,
GCPServiceAccountKey,
IacCredentials,
ImageCredentials,
KubernetesCredentials,
M365CertificateCredentials,
M365ClientSecretCredentials,
MongoDBAtlasCredentials,
OCICredentials,
OpenStackCredentials,
ProviderType,
} from "@/types";
@@ -45,9 +45,9 @@ import {
import { AzureCredentialsForm } from "./via-credentials/azure-credentials-form";
import { GitHubCredentialsForm } from "./via-credentials/github-credentials-form";
import { IacCredentialsForm } from "./via-credentials/iac-credentials-form";
import { ImageCredentialsForm } from "./via-credentials/image-credentials-form";
import { KubernetesCredentialsForm } from "./via-credentials/k8s-credentials-form";
import { MongoDBAtlasCredentialsForm } from "./via-credentials/mongodbatlas-credentials-form";
import { OpenStackCredentialsForm } from "./via-credentials/openstack-credentials-form";
import { OracleCloudCredentialsForm } from "./via-credentials/oraclecloud-credentials-form";
type BaseCredentialsFormProps = {
@@ -180,6 +180,11 @@ export const BaseCredentialsForm = ({
control={form.control as unknown as Control<IacCredentials>}
/>
)}
{providerType === "image" && (
<ImageCredentialsForm
control={form.control as unknown as Control<ImageCredentials>}
/>
)}
{providerType === "oraclecloud" && (
<OracleCloudCredentialsForm
control={form.control as unknown as Control<OCICredentials>}
@@ -208,11 +213,6 @@ export const BaseCredentialsForm = ({
}
/>
)}
{providerType === "openstack" && (
<OpenStackCredentialsForm
control={form.control as unknown as Control<OpenStackCredentials>}
/>
)}
<div className="flex w-full justify-end gap-4">
{showBackButton && requiresBackButton(searchParamsObj.get("via")) && (

View File

@@ -57,6 +57,11 @@ const getProviderFieldDetails = (providerType?: ProviderType) => {
label: "Repository URL",
placeholder: "e.g. https://github.com/user/repo",
};
case "image":
return {
label: "Registry URL",
placeholder: "e.g. 123456789012.dkr.ecr.us-east-1.amazonaws.com",
};
case "oraclecloud":
return {
label: "Tenancy OCID",
@@ -72,11 +77,6 @@ const getProviderFieldDetails = (providerType?: ProviderType) => {
label: "Account ID",
placeholder: "e.g. 1234567890123456",
};
case "openstack":
return {
label: "Project ID",
placeholder: "e.g. a1b2c3d4-e5f6-7890-abcd-ef1234567890",
};
default:
return {
label: "Provider UID",

View File

@@ -0,0 +1,83 @@
import { Control } from "react-hook-form";
import { CustomInput } from "@/components/ui/custom";
import { ImageCredentials } from "@/types";
export const ImageCredentialsForm = ({
control,
}: {
control: Control<ImageCredentials>;
}) => {
return (
<>
<div className="flex flex-col">
<div className="text-md text-default-foreground leading-9 font-bold">
Connect via Registry Credentials
</div>
<div className="text-default-500 text-sm">
Provide registry credentials to authenticate with your container
registry (all fields are optional).
</div>
</div>
<CustomInput
control={control}
name="registry_username"
label="Registry Username (Optional)"
labelPlacement="inside"
placeholder="Username for registry authentication"
variant="bordered"
type="text"
isRequired={false}
/>
<CustomInput
control={control}
name="registry_password"
label="Registry Password (Optional)"
labelPlacement="inside"
placeholder="Password for registry authentication"
variant="bordered"
type="password"
isRequired={false}
/>
<CustomInput
control={control}
name="registry_token"
label="Registry Token (Optional)"
labelPlacement="inside"
placeholder="Token for registry authentication"
variant="bordered"
type="password"
isRequired={false}
/>
<div className="flex flex-col pt-2">
<div className="text-md text-default-foreground leading-9 font-bold">
Scan Scope
</div>
<div className="text-default-500 text-sm">
Limit which repositories and tags are scanned using regex patterns.
</div>
</div>
<CustomInput
control={control}
name="image_filter"
label="Image Filter (Optional)"
labelPlacement="inside"
placeholder="e.g. ^prod/.*"
variant="bordered"
type="text"
isRequired={false}
/>
<CustomInput
control={control}
name="tag_filter"
label="Tag Filter (Optional)"
labelPlacement="inside"
placeholder="e.g. ^(latest|v\d+\.\d+\.\d+)$"
variant="bordered"
type="text"
isRequired={false}
/>
</>
);
};

View File

@@ -1,6 +1,6 @@
export * from "./azure-credentials-form";
export * from "./github-credentials-form";
export * from "./iac-credentials-form";
export * from "./image-credentials-form";
export * from "./k8s-credentials-form";
export * from "./mongodbatlas-credentials-form";
export * from "./openstack-credentials-form";

View File

@@ -1,43 +0,0 @@
import { Control } from "react-hook-form";
import { CustomInput, CustomTextarea } from "@/components/ui/custom";
import { OpenStackCredentials } from "@/types";
export const OpenStackCredentialsForm = ({
control,
}: {
control: Control<OpenStackCredentials>;
}) => {
return (
<>
<div className="flex flex-col">
<div className="text-md text-default-foreground leading-9 font-bold">
Connect via Clouds YAML
</div>
<div className="text-default-500 text-sm">
Please provide your OpenStack clouds.yaml content and the cloud name.
</div>
</div>
<CustomTextarea
control={control}
name="clouds_yaml_content"
label="Clouds YAML Content"
labelPlacement="inside"
placeholder="Paste your clouds.yaml content here"
variant="bordered"
minRows={10}
isRequired
/>
<CustomInput
control={control}
name="clouds_yaml_cloud"
type="text"
label="Cloud Name"
labelPlacement="inside"
placeholder="e.g. mycloud"
variant="bordered"
isRequired
/>
</>
);
};

View File

@@ -5,10 +5,10 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "@/components/icons/providers-badge";
import { ProviderType } from "@/types";
@@ -29,14 +29,14 @@ export const getProviderLogo = (provider: ProviderType) => {
return <GitHubProviderBadge width={35} height={35} />;
case "iac":
return <IacProviderBadge width={35} height={35} />;
case "image":
return <ImageProviderBadge width={35} height={35} />;
case "oraclecloud":
return <OracleCloudProviderBadge width={35} height={35} />;
case "mongodbatlas":
return <MongoDBAtlasProviderBadge width={35} height={35} />;
case "alibabacloud":
return <AlibabaCloudProviderBadge width={35} height={35} />;
case "openstack":
return <OpenStackProviderBadge width={35} height={35} />;
default:
return null;
}
@@ -58,14 +58,14 @@ export const getProviderName = (provider: ProviderType): string => {
return "GitHub";
case "iac":
return "Infrastructure as Code";
case "image":
return "Container Registry";
case "oraclecloud":
return "Oracle Cloud Infrastructure";
case "mongodbatlas":
return "MongoDB Atlas";
case "alibabacloud":
return "Alibaba Cloud";
case "openstack":
return "OpenStack";
default:
return "Unknown Provider";
}

View File

@@ -192,12 +192,6 @@ export const useCredentialsForm = ({
[ProviderCredentialFields.ALIBABACLOUD_ACCESS_KEY_ID]: "",
[ProviderCredentialFields.ALIBABACLOUD_ACCESS_KEY_SECRET]: "",
};
case "openstack":
return {
...baseDefaults,
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]: "",
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]: "",
};
default:
return baseDefaults;
}

View File

@@ -30,8 +30,4 @@ export const PROVIDER_CREDENTIALS_ERROR_MAPPING: Record<string, string> = {
ProviderCredentialFields.SERVICE_ACCOUNT_KEY,
[ErrorPointers.ATLAS_PUBLIC_KEY]: ProviderCredentialFields.ATLAS_PUBLIC_KEY,
[ErrorPointers.ATLAS_PRIVATE_KEY]: ProviderCredentialFields.ATLAS_PRIVATE_KEY,
[ErrorPointers.OPENSTACK_CLOUDS_YAML_CONTENT]:
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT,
[ErrorPointers.OPENSTACK_CLOUDS_YAML_CLOUD]:
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD,
};

View File

@@ -43,6 +43,11 @@ export const getProviderHelpText = (provider: string) => {
text: "Need help scanning your Infrastructure as Code repository?",
link: "https://goto.prowler.com/provider-iac",
};
case "image":
return {
text: "Need help scanning your container registry?",
link: "https://goto.prowler.com/provider-image",
};
case "oraclecloud":
return {
text: "Need help connecting your Oracle Cloud account?",
@@ -58,11 +63,6 @@ export const getProviderHelpText = (provider: string) => {
text: "Need help connecting your Alibaba Cloud account?",
link: "https://goto.prowler.com/provider-alibabacloud",
};
case "openstack":
return {
text: "Need help connecting your OpenStack cloud?",
link: "https://goto.prowler.com/provider-openstack",
};
default:
return {
text: "How to setup a provider?",

View File

@@ -250,20 +250,6 @@ export const buildAlibabaCloudSecret = (
return filterEmptyValues(secret);
};
export const buildOpenStackSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]: getFormValue(
formData,
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT,
),
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]: getFormValue(
formData,
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD,
),
};
return filterEmptyValues(secret);
};
export const buildIacSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.REPOSITORY_URL]: getFormValue(
@@ -278,6 +264,32 @@ export const buildIacSecret = (formData: FormData) => {
return filterEmptyValues(secret);
};
export const buildImageSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.REGISTRY_USERNAME]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_USERNAME,
),
[ProviderCredentialFields.REGISTRY_PASSWORD]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_PASSWORD,
),
[ProviderCredentialFields.REGISTRY_TOKEN]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_TOKEN,
),
[ProviderCredentialFields.IMAGE_FILTER]: getFormValue(
formData,
ProviderCredentialFields.IMAGE_FILTER,
),
[ProviderCredentialFields.TAG_FILTER]: getFormValue(
formData,
ProviderCredentialFields.TAG_FILTER,
),
};
return filterEmptyValues(secret);
};
/**
* Utility function to safely encode a string to base64
* Handles UTF-8 characters properly without using deprecated APIs
@@ -371,6 +383,10 @@ export const buildSecretConfig = (
secretType: "static",
secret: buildIacSecret(formData),
}),
image: () => ({
secretType: "static",
secret: buildImageSecret(formData),
}),
oraclecloud: () => ({
secretType: "static",
secret: buildOracleCloudSecret(formData, providerUid),
@@ -387,10 +403,6 @@ export const buildSecretConfig = (
secret: buildAlibabaCloudSecret(formData, isRole),
};
},
openstack: () => ({
secretType: "static",
secret: buildOpenStackSecret(formData),
}),
};
const builder = secretBuilders[providerType];

View File

@@ -53,6 +53,13 @@ export const ProviderCredentialFields = {
REPOSITORY_URL: "repository_url",
ACCESS_TOKEN: "access_token",
// Image (Container Registry) fields
REGISTRY_USERNAME: "registry_username",
REGISTRY_PASSWORD: "registry_password",
REGISTRY_TOKEN: "registry_token",
IMAGE_FILTER: "image_filter",
TAG_FILTER: "tag_filter",
// OCI fields
OCI_USER: "user",
OCI_FINGERPRINT: "fingerprint",
@@ -67,10 +74,6 @@ export const ProviderCredentialFields = {
ALIBABACLOUD_ACCESS_KEY_SECRET: "access_key_secret",
ALIBABACLOUD_ROLE_ARN: "role_arn",
ALIBABACLOUD_ROLE_SESSION_NAME: "role_session_name",
// OpenStack fields
OPENSTACK_CLOUDS_YAML_CONTENT: "clouds_yaml_content",
OPENSTACK_CLOUDS_YAML_CLOUD: "clouds_yaml_cloud",
} as const;
// Type for credential field values
@@ -101,6 +104,11 @@ export const ErrorPointers = {
GITHUB_APP_KEY: "/data/attributes/secret/github_app_key_content",
REPOSITORY_URL: "/data/attributes/secret/repository_url",
ACCESS_TOKEN: "/data/attributes/secret/access_token",
REGISTRY_USERNAME: "/data/attributes/secret/registry_username",
REGISTRY_PASSWORD: "/data/attributes/secret/registry_password",
REGISTRY_TOKEN: "/data/attributes/secret/registry_token",
IMAGE_FILTER: "/data/attributes/secret/image_filter",
TAG_FILTER: "/data/attributes/secret/tag_filter",
CERTIFICATE_CONTENT: "/data/attributes/secret/certificate_content",
OCI_USER: "/data/attributes/secret/user",
OCI_FINGERPRINT: "/data/attributes/secret/fingerprint",
@@ -115,8 +123,6 @@ export const ErrorPointers = {
ALIBABACLOUD_ACCESS_KEY_SECRET: "/data/attributes/secret/access_key_secret",
ALIBABACLOUD_ROLE_ARN: "/data/attributes/secret/role_arn",
ALIBABACLOUD_ROLE_SESSION_NAME: "/data/attributes/secret/role_session_name",
OPENSTACK_CLOUDS_YAML_CONTENT: "/data/attributes/secret/clouds_yaml_content",
OPENSTACK_CLOUDS_YAML_CLOUD: "/data/attributes/secret/clouds_yaml_cloud",
} as const;
export type ErrorPointer = (typeof ErrorPointers)[keyof typeof ErrorPointers];

View File

@@ -304,6 +304,15 @@ export type IacCredentials = {
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type ImageCredentials = {
[ProviderCredentialFields.REGISTRY_USERNAME]?: string;
[ProviderCredentialFields.REGISTRY_PASSWORD]?: string;
[ProviderCredentialFields.REGISTRY_TOKEN]?: string;
[ProviderCredentialFields.IMAGE_FILTER]?: string;
[ProviderCredentialFields.TAG_FILTER]?: string;
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type OCICredentials = {
[ProviderCredentialFields.OCI_USER]: string;
[ProviderCredentialFields.OCI_FINGERPRINT]: string;
@@ -334,12 +343,6 @@ export type AlibabaCloudCredentialsRole = {
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type OpenStackCredentials = {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]: string;
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]: string;
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type CredentialsFormSchema =
| AWSCredentials
| AWSCredentialsRole
@@ -348,12 +351,12 @@ export type CredentialsFormSchema =
| GCPServiceAccountKey
| KubernetesCredentials
| IacCredentials
| ImageCredentials
| M365Credentials
| OCICredentials
| MongoDBAtlasCredentials
| AlibabaCloudCredentials
| AlibabaCloudCredentialsRole
| OpenStackCredentials;
| AlibabaCloudCredentialsRole;
export interface SearchParamsProps {
[key: string]: string | string[] | undefined;

View File

@@ -115,6 +115,11 @@ export const addProviderFormSchema = z
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("image"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("oraclecloud"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
@@ -130,11 +135,6 @@ export const addProviderFormSchema = z
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("openstack"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
]),
);
@@ -264,14 +264,23 @@ export const addCredentialsFormSchema = (
.string()
.min(1, "Access Key Secret is required"),
}
: providerType === "openstack"
: providerType === "image"
? {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]:
z
.string()
.min(1, "Clouds YAML content is required"),
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]:
z.string().min(1, "Cloud name is required"),
[ProviderCredentialFields.REGISTRY_USERNAME]: z
.string()
.optional(),
[ProviderCredentialFields.REGISTRY_PASSWORD]: z
.string()
.optional(),
[ProviderCredentialFields.REGISTRY_TOKEN]: z
.string()
.optional(),
[ProviderCredentialFields.IMAGE_FILTER]: z
.string()
.optional(),
[ProviderCredentialFields.TAG_FILTER]: z
.string()
.optional(),
}
: {}),
})
@@ -335,6 +344,18 @@ export const addCredentialsFormSchema = (
}
}
}
if (providerType === "image") {
const password = data[ProviderCredentialFields.REGISTRY_PASSWORD];
const username = data[ProviderCredentialFields.REGISTRY_USERNAME];
if (password && (!username || username.trim() === "")) {
ctx.addIssue({
code: "custom",
message: "Registry Username is required when providing a password",
path: [ProviderCredentialFields.REGISTRY_USERNAME],
});
}
}
});
export const addCredentialsRoleFormSchema = (providerType: string) =>

View File

@@ -7,9 +7,9 @@ export const PROVIDER_TYPES = [
"mongodbatlas",
"github",
"iac",
"image",
"oraclecloud",
"alibabacloud",
"openstack",
] as const;
export type ProviderType = (typeof PROVIDER_TYPES)[number];
@@ -23,9 +23,9 @@ export const PROVIDER_DISPLAY_NAMES: Record<ProviderType, string> = {
mongodbatlas: "MongoDB Atlas",
github: "GitHub",
iac: "Infrastructure as Code",
image: "Container Registry",
oraclecloud: "Oracle Cloud Infrastructure",
alibabacloud: "Alibaba Cloud",
openstack: "OpenStack",
};
export function getProviderDisplayName(providerId: string): string {