Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
03dfc3f434 build(deps): bump cryptography from 44.0.1 to 46.0.5 in /api
Bumps [cryptography](https://github.com/pyca/cryptography) from 44.0.1 to 46.0.5.
- [Changelog](https://github.com/pyca/cryptography/blob/main/CHANGELOG.rst)
- [Commits](https://github.com/pyca/cryptography/compare/44.0.1...46.0.5)

---
updated-dependencies:
- dependency-name: cryptography
  dependency-version: 46.0.5
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-18 15:31:17 +00:00
94 changed files with 125 additions and 8026 deletions

View File

@@ -6,7 +6,6 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- `image` provider support for container image scanning [(#10128)](https://github.com/prowler-cloud/prowler/pull/10128)
- OpenStack provider support [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003)
- PDF report for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)

View File

@@ -5,7 +5,7 @@ LABEL maintainer="https://github.com/prowler-cloud/api"
ARG POWERSHELL_VERSION=7.5.0
ENV POWERSHELL_VERSION=${POWERSHELL_VERSION}
ARG TRIVY_VERSION=0.69.1
ARG TRIVY_VERSION=0.66.0
ENV TRIVY_VERSION=${TRIVY_VERSION}
# hadolint ignore=DL3008

2
api/poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]]
name = "about-time"

View File

@@ -1,38 +0,0 @@
from django.db import migrations
import api.db_utils
class Migration(migrations.Migration):
dependencies = [
("api", "0080_backfill_attack_paths_graph_data_ready"),
]
operations = [
migrations.AlterField(
model_name="provider",
name="provider",
field=api.db_utils.ProviderEnumField(
choices=[
("aws", "AWS"),
("azure", "Azure"),
("gcp", "GCP"),
("kubernetes", "Kubernetes"),
("m365", "M365"),
("github", "GitHub"),
("mongodbatlas", "MongoDB Atlas"),
("iac", "IaC"),
("oraclecloud", "Oracle Cloud Infrastructure"),
("alibabacloud", "Alibaba Cloud"),
("cloudflare", "Cloudflare"),
("openstack", "OpenStack"),
("image", "Image"),
],
default="aws",
),
),
migrations.RunSQL(
"ALTER TYPE provider ADD VALUE IF NOT EXISTS 'image';",
reverse_sql=migrations.RunSQL.noop,
),
]

View File

@@ -289,7 +289,6 @@ class Provider(RowLevelSecurityProtectedModel):
ALIBABACLOUD = "alibabacloud", _("Alibaba Cloud")
CLOUDFLARE = "cloudflare", _("Cloudflare")
OPENSTACK = "openstack", _("OpenStack")
IMAGE = "image", _("Image")
@staticmethod
def validate_aws_uid(value):
@@ -424,15 +423,6 @@ class Provider(RowLevelSecurityProtectedModel):
pointer="/data/attributes/uid",
)
@staticmethod
def validate_image_uid(value):
if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9._/:@-]{2,249}$", value):
raise ModelValidationError(
detail="Image provider ID must be a valid container image reference.",
code="image-uid",
pointer="/data/attributes/uid",
)
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
inserted_at = models.DateTimeField(auto_now_add=True, editable=False)
updated_at = models.DateTimeField(auto_now=True, editable=False)

View File

@@ -2,7 +2,6 @@ import pytest
from rest_framework.exceptions import ValidationError
from api.v1.serializer_utils.integrations import S3ConfigSerializer
from api.v1.serializers import ImageProviderSecret
class TestS3ConfigSerializer:
@@ -99,37 +98,3 @@ class TestS3ConfigSerializer:
serializer = S3ConfigSerializer(data=data)
assert not serializer.is_valid()
assert "output_directory" in serializer.errors
class TestImageProviderSecret:
"""Test cases for ImageProviderSecret validation."""
def test_valid_no_credentials(self):
serializer = ImageProviderSecret(data={})
assert serializer.is_valid()
def test_valid_token_only(self):
serializer = ImageProviderSecret(data={"registry_token": "tok"})
assert serializer.is_valid()
def test_valid_username_and_password(self):
serializer = ImageProviderSecret(
data={"registry_username": "user", "registry_password": "pass"}
)
assert serializer.is_valid()
def test_valid_token_with_username_only(self):
serializer = ImageProviderSecret(
data={"registry_token": "tok", "registry_username": "user"}
)
assert serializer.is_valid()
def test_invalid_username_without_password(self):
serializer = ImageProviderSecret(data={"registry_username": "user"})
assert not serializer.is_valid()
assert "non_field_errors" in serializer.errors
def test_invalid_password_without_username(self):
serializer = ImageProviderSecret(data={"registry_password": "pass"})
assert not serializer.is_valid()
assert "non_field_errors" in serializer.errors

View File

@@ -24,7 +24,6 @@ from prowler.providers.cloudflare.cloudflare_provider import CloudflareProvider
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
@@ -123,7 +122,6 @@ class TestReturnProwlerProvider:
(Provider.ProviderChoices.ALIBABACLOUD.value, AlibabacloudProvider),
(Provider.ProviderChoices.CLOUDFLARE.value, CloudflareProvider),
(Provider.ProviderChoices.OPENSTACK.value, OpenstackProvider),
(Provider.ProviderChoices.IMAGE.value, ImageProvider),
],
)
def test_return_prowler_provider(self, provider_type, expected_provider):
@@ -190,47 +188,6 @@ class TestProwlerProviderConnectionTest:
assert isinstance(connection.error, Provider.secret.RelatedObjectDoesNotExist)
assert str(connection.error) == "Provider has no secret."
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_image_provider(
self, mock_return_prowler_provider
):
"""Test connection test for Image provider with credentials."""
provider = MagicMock()
provider.uid = "docker.io/myns/myimage:latest"
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret.secret = {
"registry_username": "user",
"registry_password": "pass",
"registry_token": "tok123",
}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
image="docker.io/myns/myimage:latest",
raise_on_exception=False,
registry_username="user",
registry_password="pass",
registry_token="tok123",
)
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_image_provider_no_creds(
self, mock_return_prowler_provider
):
"""Test connection test for Image provider without credentials."""
provider = MagicMock()
provider.uid = "alpine:3.18"
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret.secret = {}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
image="alpine:3.18",
raise_on_exception=False,
)
class TestGetProwlerProviderKwargs:
@pytest.mark.parametrize(
@@ -379,123 +336,6 @@ class TestGetProwlerProviderKwargs:
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_registry_url(self):
"""Test that Image provider with a registry URL gets 'registry' kwarg."""
provider_uid = "docker.io/myns"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {
"registry": provider_uid,
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_image_ref(self):
"""Test that Image provider with a full image reference gets 'images' kwarg."""
provider_uid = "docker.io/myns/myimage:latest"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {
"images": [provider_uid],
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_dockerhub_image(self):
"""Test that Image provider with a short DockerHub image gets 'images' kwarg."""
provider_uid = "alpine:3.18"
secret_dict = {}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {"images": [provider_uid]}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_filters_falsy_secrets(self):
"""Test that falsy secret values are filtered out for Image provider."""
provider_uid = "docker.io/myns/myimage:latest"
secret_dict = {
"registry_username": "",
"registry_password": "",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {"images": [provider_uid]}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_ignores_mutelist(self):
"""Test that Image provider does NOT receive mutelist_content.
Image provider uses Trivy's built-in mutelist logic, so it should not
receive mutelist_content even when a mutelist processor is configured.
"""
provider_uid = "docker.io/myns/myimage:latest"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
mutelist_processor = MagicMock()
mutelist_processor.configuration = {"Mutelist": {"key": "value"}}
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider, mutelist_processor)
assert "mutelist_content" not in result
expected_result = {
"images": [provider_uid],
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_unsupported_provider(self):
# Setup
provider_uid = "provider_uid"

View File

@@ -28,7 +28,6 @@ if TYPE_CHECKING:
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import (
@@ -84,7 +83,6 @@ def return_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
| ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -97,7 +95,7 @@ def return_prowler_provider(
provider (Provider): The provider object containing the provider type and associated secrets.
Returns:
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
Raises:
ValueError: If the provider type specified in `provider.provider` is not supported.
@@ -161,10 +159,6 @@ def return_prowler_provider(
from prowler.providers.openstack.openstack_provider import OpenstackProvider
prowler_provider = OpenstackProvider
case Provider.ProviderChoices.IMAGE.value:
from prowler.providers.image.image_provider import ImageProvider
prowler_provider = ImageProvider
case _:
raise ValueError(f"Provider type {provider.provider} not supported")
return prowler_provider
@@ -227,29 +221,11 @@ def get_prowler_provider_kwargs(
# clouds.yaml is not feasible because not all auth methods include it and the
# Keystone API is unavailable on public clouds.
pass
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
# Detect whether uid is a registry URL (e.g. "docker.io/andoniaf") or
# a concrete image reference (e.g. "docker.io/andoniaf/myimage:latest").
from prowler.providers.image.image_provider import ImageProvider
if ImageProvider._is_registry_url(provider.uid):
prowler_provider_kwargs = {
"registry": provider.uid,
**{k: v for k, v in prowler_provider_kwargs.items() if v},
}
else:
prowler_provider_kwargs = {
"images": [provider.uid],
**{k: v for k, v in prowler_provider_kwargs.items() if v},
}
if mutelist_processor:
mutelist_content = mutelist_processor.configuration.get("Mutelist", {})
# IaC and Image providers don't support mutelist (both use Trivy's built-in logic)
if mutelist_content and provider.provider not in (
Provider.ProviderChoices.IAC.value,
Provider.ProviderChoices.IMAGE.value,
):
# IaC provider doesn't support mutelist (uses Trivy's built-in logic)
if mutelist_content and provider.provider != Provider.ProviderChoices.IAC.value:
prowler_provider_kwargs["mutelist_content"] = mutelist_content
return prowler_provider_kwargs
@@ -266,7 +242,6 @@ def initialize_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
| ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -280,7 +255,7 @@ def initialize_prowler_provider(
mutelist_processor (Processor): The mutelist processor object containing the mutelist configuration.
Returns:
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
initialized with the provider's secrets.
"""
prowler_provider = return_prowler_provider(provider)
@@ -322,22 +297,6 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
"raise_on_exception": False,
}
return prowler_provider.test_connection(**openstack_kwargs)
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
image_kwargs = {
"image": provider.uid,
"raise_on_exception": False,
}
if prowler_provider_kwargs.get("registry_username"):
image_kwargs["registry_username"] = prowler_provider_kwargs[
"registry_username"
]
if prowler_provider_kwargs.get("registry_password"):
image_kwargs["registry_password"] = prowler_provider_kwargs[
"registry_password"
]
if prowler_provider_kwargs.get("registry_token"):
image_kwargs["registry_token"] = prowler_provider_kwargs["registry_token"]
return prowler_provider.test_connection(**image_kwargs)
else:
return prowler_provider.test_connection(
**prowler_provider_kwargs,

View File

@@ -1528,8 +1528,6 @@ class BaseWriteProviderSecretSerializer(BaseWriteSerializer):
)
elif provider_type == Provider.ProviderChoices.OPENSTACK.value:
serializer = OpenStackCloudsYamlProviderSecret(data=secret)
elif provider_type == Provider.ProviderChoices.IMAGE.value:
serializer = ImageProviderSecret(data=secret)
else:
raise serializers.ValidationError(
{"provider": f"Provider type not supported {provider_type}"}
@@ -1704,30 +1702,6 @@ class OpenStackCloudsYamlProviderSecret(serializers.Serializer):
resource_name = "provider-secrets"
class ImageProviderSecret(serializers.Serializer):
registry_username = serializers.CharField(required=False)
registry_password = serializers.CharField(required=False)
registry_token = serializers.CharField(required=False)
class Meta:
resource_name = "provider-secrets"
def validate(self, attrs):
token = attrs.get("registry_token")
username = attrs.get("registry_username")
password = attrs.get("registry_password")
if not token:
if username and not password:
raise serializers.ValidationError(
"registry_password is required when registry_username is provided."
)
if password and not username:
raise serializers.ValidationError(
"registry_username is required when registry_password is provided."
)
return attrs
class AlibabaCloudProviderSecret(serializers.Serializer):
access_key_id = serializers.CharField()
access_key_secret = serializers.CharField()

View File

@@ -137,7 +137,6 @@ COMPLIANCE_CLASS_MAP = {
# IaC provider doesn't have specific compliance frameworks yet
# Trivy handles its own compliance checks
],
"image": [],
"oraclecloud": [
(lambda name: name.startswith("cis_"), OracleCloudCIS),
(lambda name: name.startswith("csa_"), OracleCloudCSA),

View File

@@ -10,7 +10,7 @@ Prowler's Image provider enables comprehensive container image security scanning
* **Trivy integration:** Prowler leverages [Trivy](https://trivy.dev/) to scan container images for vulnerabilities, secrets, misconfigurations, and license issues.
* **Trivy required:** Trivy must be installed and available in the system PATH before running any scan.
* **Authentication:** No registry authentication is required for public images. For private registries, credentials can be provided via environment variables or manual `docker login`.
* **Authentication:** No registry authentication is required for public images. For private registries, configure Docker credentials via `docker login` before scanning.
* **Output formats:** Results are output in the same formats as other Prowler providers (CSV, JSON, HTML, etc.).
## Prowler CLI
@@ -173,147 +173,25 @@ prowler image -I large-image:latest --timeout 10m
The timeout accepts values in seconds (`s`), minutes (`m`), or hours (`h`). Default: `5m`.
### Registry Scan Mode
Registry Scan Mode enumerates and scans all images from an OCI-compatible registry, Docker Hub namespace, or Amazon ECR registry. To activate it, use the `--registry` flag with the registry URL:
```bash
prowler image --registry myregistry.io
```
#### Discover Available Images
To list all repositories and tags available in the registry without running a scan, use the `--registry-list` flag. This is useful for discovering image names and tags before building filter regexes:
```bash
prowler image --registry myregistry.io --registry-list
```
Example output:
```text
Registry: myregistry.io (3 repositories, 8 images)
api-service (2 tags)
latest, v3.1
hub-scanner (3 tags)
latest, v1.0, v2.0
web-frontend (3 tags)
latest, v1.0, v2.0
```
Filters can be combined with `--registry-list` to preview the results before scanning:
```bash
prowler image --registry myregistry.io --registry-list --image-filter "api.*"
```
#### Filter Repositories
To filter repositories by name during enumeration, use the `--image-filter` flag with a Python regex pattern (matched via `re.search`):
```bash
# Scan only repositories starting with "prod/"
prowler image --registry myregistry.io --image-filter "^prod/"
```
#### Filter Tags
To filter tags during enumeration, use the `--tag-filter` flag with a Python regex pattern:
```bash
# Scan only semantic version tags
prowler image --registry myregistry.io --tag-filter "^v\d+\.\d+\.\d+$"
```
Both filters can be combined:
```bash
prowler image --registry myregistry.io --image-filter "^prod/" --tag-filter "^(latest|v\d+)"
```
#### Limit the Number of Images
To prevent accidentally scanning a large number of images, use the `--max-images` flag. The scan aborts if the discovered image count exceeds the limit:
```bash
prowler image --registry myregistry.io --max-images 10
```
Setting `--max-images` to `0` (default) disables the limit.
<Note>
When `--registry-list` is active, the `--max-images` limit is not enforced because no scan is performed.
</Note>
#### Skip TLS Verification
To connect to registries with self-signed certificates, use the `--registry-insecure` flag:
```bash
prowler image --registry internal-registry.local --registry-insecure
```
<Warning>
Skipping TLS verification disables certificate validation for registry connections. Use this flag only for trusted internal registries with self-signed certificates.
</Warning>
#### Supported Registries
Registry Scan Mode supports the following registry types:
* **OCI-compatible registries:** Any registry implementing the OCI Distribution Specification (e.g., Harbor, GitLab Container Registry, GitHub Container Registry).
* **Docker Hub:** Specify a namespace with `--registry docker.io/{org_or_user}`. Public namespaces can be scanned without credentials; authenticated access is used automatically when `REGISTRY_USERNAME` and `REGISTRY_PASSWORD` are set.
* **Amazon ECR:** Use the full ECR endpoint URL (e.g., `123456789.dkr.ecr.us-east-1.amazonaws.com`). Authentication is handled via AWS credentials.
### Authentication for Private Registries
To scan images from private registries, the Image provider supports three authentication methods. Prowler uses the first available method in this priority order:
#### 1. Basic Authentication (Environment Variables)
To authenticate with a username and password, set the `REGISTRY_USERNAME` and `REGISTRY_PASSWORD` environment variables. Prowler automatically runs `docker login`, pulls the image, and performs a `docker logout` after the scan completes:
```bash
export REGISTRY_USERNAME="myuser"
export REGISTRY_PASSWORD="mypassword"
prowler image -I myregistry.io/myapp:v1.0
```
Both variables must be set for this method to activate. Prowler handles the full lifecycle — login, pull, scan, and cleanup — without any manual Docker commands.
#### 2. Token-Based Authentication
To authenticate using a registry token (such as a bearer or OAuth2 token), set the `REGISTRY_TOKEN` environment variable. Prowler passes the token directly to Trivy:
```bash
export REGISTRY_TOKEN="my-registry-token"
prowler image -I myregistry.io/myapp:v1.0
```
This method is useful for registries that support token-based access without requiring a username and password.
#### 3. Manual Docker Login (Fallback)
If no environment variables are set, Prowler relies on existing credentials in Docker's credential store (`~/.docker/config.json`). To configure credentials manually before scanning:
The Image provider relies on Trivy for registry authentication. To scan images from private registries, configure Docker credentials before running the scan:
```bash
# Log in to a private registry
docker login myregistry.io
# Then scan the image
prowler image -I myregistry.io/myapp:v1.0
```
<Note>
When basic authentication is active (method 1), Prowler automatically logs out from all authenticated registries after the scan completes. Manual `docker login` sessions (method 3) are not affected by this cleanup.
</Note>
Trivy automatically uses credentials from Docker's credential store (`~/.docker/config.json`).
### Troubleshooting Common Scan Errors
The Image provider categorizes common Trivy errors with actionable guidance:
* **Authentication failure (401/403):** Registry credentials are missing or invalid. Verify the `REGISTRY_USERNAME`/`REGISTRY_PASSWORD` or `REGISTRY_TOKEN` environment variables, or run `docker login` for the target registry and retry the scan.
* **Authentication failure (401/403):** Registry credentials are missing or invalid. Run `docker login` for the target registry and retry the scan.
* **Image not found (404):** The specified image name, tag, or registry is incorrect. Verify the image reference exists and is accessible.
* **Rate limited (429):** The container registry is throttling requests. Wait before retrying, or authenticate to increase rate limits.
* **Network issue:** Trivy cannot reach the registry due to connectivity problems. Check network access, DNS resolution, and firewall rules.

View File

@@ -41,12 +41,8 @@ When using service principal authentication, add these **Application Permissions
- `AuditLog.Read.All`: Required for Entra service.
- `Directory.Read.All`: Required for all services.
- `OnPremDirectorySynchronization.Read.All`: Required for `entra_seamless_sso_disabled` check (hybrid deployments).
- `Policy.Read.All`: Required for all services.
- `SecurityIdentitiesHealth.Read.All`: Required for `defenderidentity_health_issues_no_open` check.
- `SecurityIdentitiesSensors.Read.All`: Required for `defenderidentity_health_issues_no_open` check.
- `SharePointTenantSettings.Read.All`: Required for SharePoint service.
- `ThreatHunting.Read.All`: Required for Defender XDR checks (`defenderxdr_endpoint_privileged_user_exposed_credentials`, `defenderxdr_critical_asset_management_pending_approvals`).
**External API Permissions:**
@@ -109,10 +105,7 @@ Browser and Azure CLI authentication methods limit scanning capabilities to chec
- `AuditLog.Read.All`: Required for Entra service
- `Directory.Read.All`: Required for all services
- `OnPremDirectorySynchronization.Read.All`: Required for `entra_seamless_sso_disabled` check (hybrid deployments)
- `Policy.Read.All`: Required for all services
- `SecurityIdentitiesHealth.Read.All`: Required for `defenderidentity_health_issues_no_open` check
- `SecurityIdentitiesSensors.Read.All`: Required for `defenderidentity_health_issues_no_open` check
- `SharePointTenantSettings.Read.All`: Required for SharePoint service
![Permission Screenshots](/images/providers/directory-permission.png)

4
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -4874,7 +4874,7 @@ description = "C parser in Python"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
markers = "platform_python_implementation != \"PyPy\" and implementation_name != \"PyPy\""
markers = "implementation_name != \"PyPy\" and platform_python_implementation != \"PyPy\""
files = [
{file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"},
{file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"},

View File

@@ -6,8 +6,6 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🚀 Added
- `entra_app_registration_no_unused_privileged_permissions` check for m365 provider [(#10080)](https://github.com/prowler-cloud/prowler/pull/10080)
- `defenderidentity_health_issues_no_open` check for M365 provider [(#10087)](https://github.com/prowler-cloud/prowler/pull/10087)
- `organization_verified_badge` check for GitHub provider [(#10033)](https://github.com/prowler-cloud/prowler/pull/10033)
- OpenStack provider `clouds_yaml_content` parameter for API integration [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003)
- `defender_safe_attachments_policy_enabled` check for M365 provider [(#9833)](https://github.com/prowler-cloud/prowler/pull/9833)
@@ -22,10 +20,6 @@ All notable changes to the **Prowler SDK** are documented in this file.
- OpenStack compute 7 new checks [(#9944)](https://github.com/prowler-cloud/prowler/pull/9944)
- CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061)
- ECS Exec (ECS-006) privilege escalation detection via `ecs:ExecuteCommand` + `ecs:DescribeTasks` [(#10066)](https://github.com/prowler-cloud/prowler/pull/10066)
- `defenderxdr_endpoint_privileged_user_exposed_credentials` check for M365 provider [(#10084)](https://github.com/prowler-cloud/prowler/pull/10084)
- `defenderxdr_critical_asset_management_pending_approvals` check for M365 provider [(#10085)](https://github.com/prowler-cloud/prowler/pull/10085)
- `entra_seamless_sso_disabled` check for m365 provider [(#10086)](https://github.com/prowler-cloud/prowler/pull/10086)
- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)
### 🔄 Changed

View File

@@ -983,7 +983,6 @@
"Id": "5.1.5.1",
"Description": "Control when end users and group owners are allowed to grant consent to applications, and when they will be required to request administrator review and approval. Allowing users to grant apps access to data helps them acquire useful applications and be productive but can represent a risk in some situations if it's not monitored and controlled carefully.",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps"
],
"Attributes": [

View File

@@ -1215,7 +1215,6 @@
"Id": "5.1.5.1",
"Description": "User consent to apps accessing company data on their behalf allows users to grant permissions to applications without administrator involvement. The recommended state is Do not allow user consent.",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps"
],
"Attributes": [

View File

@@ -117,8 +117,6 @@
"defender_malware_policy_notifications_internal_users_malware_enabled",
"defender_safelinks_policy_enabled",
"defender_zap_for_teams_enabled",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"defender_identity_health_issues_no_open",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_identity_protection_sign_in_risk_enabled",
"entra_identity_protection_user_risk_enabled"
@@ -156,7 +154,6 @@
}
],
"Checks": [
"defenderxdr_critical_asset_management_pending_approvals",
"sharepoint_external_sharing_managed",
"exchange_external_email_tagging_enabled"
]
@@ -202,8 +199,7 @@
"admincenter_users_admins_reduced_license_footprint",
"entra_admin_portals_access_restriction",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_policy_guest_users_access_restrictions",
"entra_seamless_sso_disabled"
"entra_policy_guest_users_access_restrictions"
]
},
{
@@ -219,8 +215,7 @@
}
],
"Checks": [
"admincenter_settings_password_never_expire",
"entra_seamless_sso_disabled"
"admincenter_settings_password_never_expire"
]
},
{
@@ -236,13 +231,11 @@
}
],
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_admin_users_sign_in_frequency_enabled",
"entra_admin_users_mfa_enabled",
"entra_admin_users_sign_in_frequency_enabled",
"entra_legacy_authentication_blocked",
"entra_managed_device_required_for_authentication",
"entra_seamless_sso_disabled",
"entra_users_mfa_enabled",
"exchange_organization_modern_authentication_enabled",
"exchange_transport_config_smtp_auth_disabled",
@@ -262,12 +255,11 @@
}
],
"Checks": [
"entra_admin_portals_access_restriction",
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_guest_users_access_restrictions",
"sharepoint_external_sharing_managed",
"sharepoint_external_sharing_restricted",
"sharepoint_guest_sharing_restricted"
"sharepoint_external_sharing_managed",
"sharepoint_guest_sharing_restricted",
"entra_policy_guest_users_access_restrictions",
"entra_admin_portals_access_restriction"
]
},
{
@@ -456,7 +448,6 @@
"defender_antispam_outbound_policy_configured",
"defender_antispam_outbound_policy_forwarding_disabled",
"defender_antispam_policy_inbound_no_allowed_domains",
"defenderxdr_critical_asset_management_pending_approvals",
"defender_chat_report_policy_configured",
"defender_malware_policy_common_attachments_filter_enabled",
"defender_malware_policy_comprehensive_attachments_filter_applied",
@@ -611,7 +602,6 @@
}
],
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_managed_device_required_for_authentication",
"entra_users_mfa_enabled",
"entra_managed_device_required_for_mfa_registration",
@@ -639,17 +629,14 @@
"admincenter_users_admins_reduced_license_footprint",
"admincenter_users_between_two_and_four_global_admins",
"defender_antispam_outbound_policy_configured",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_admin_consent_workflow_enabled",
"entra_admin_portals_access_restriction",
"entra_admin_users_cloud_only",
"entra_admin_users_mfa_enabled",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_admin_users_sign_in_frequency_enabled",
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_ensure_default_user_cannot_create_tenants",
"entra_policy_guest_invite_only_for_admin_roles",
"entra_seamless_sso_disabled"
"entra_policy_guest_invite_only_for_admin_roles"
]
},
{
@@ -686,7 +673,6 @@
"entra_admin_users_sign_in_frequency_enabled",
"entra_admin_users_mfa_enabled",
"entra_managed_device_required_for_authentication",
"entra_seamless_sso_disabled",
"entra_users_mfa_enabled",
"entra_identity_protection_sign_in_risk_enabled"
]
@@ -729,9 +715,7 @@
"Checks": [
"defender_malware_policy_common_attachments_filter_enabled",
"defender_malware_policy_comprehensive_attachments_filter_applied",
"defender_malware_policy_notifications_internal_users_malware_enabled",
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"defender_identity_health_issues_no_open"
"defender_malware_policy_notifications_internal_users_malware_enabled"
]
},
{
@@ -781,9 +765,8 @@
}
],
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps",
"entra_thirdparty_integrated_apps_not_allowed",
"entra_policy_restricts_user_consent_for_apps",
"teams_external_domains_restricted",
"teams_external_users_cannot_start_conversations"
]
@@ -874,10 +857,9 @@
}
],
"Checks": [
"entra_policy_restricts_user_consent_for_apps",
"admincenter_users_admins_reduced_license_footprint",
"defender_malware_policy_comprehensive_attachments_filter_applied",
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps",
"entra_thirdparty_integrated_apps_not_allowed",
"sharepoint_modern_authentication_required"
]

View File

@@ -387,7 +387,6 @@
"Id": "1.2.4",
"Description": "Enable Identity Protection user risk policies",
"Checks": [
"defenderxdr_endpoint_privileged_user_exposed_credentials",
"entra_identity_protection_user_risk_enabled"
],
"Attributes": [
@@ -713,7 +712,6 @@
"Id": "1.3.3",
"Description": "Ensure third party integrated applications are not allowed",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_thirdparty_integrated_apps_not_allowed"
],
"Attributes": [
@@ -750,7 +748,6 @@
"Id": "1.3.5",
"Description": "Ensure user consent to apps accessing company data on their behalf is not allowed",
"Checks": [
"entra_app_registration_no_unused_privileged_permissions",
"entra_policy_restricts_user_consent_for_apps"
],
"Attributes": [
@@ -1148,8 +1145,7 @@
"Id": "4.1.2",
"Description": "Ensure that password hash sync is enabled for hybrid deployments",
"Checks": [
"entra_password_hash_sync_enabled",
"entra_seamless_sso_disabled"
"entra_password_hash_sync_enabled"
],
"Attributes": [
{

View File

@@ -22,8 +22,8 @@ def load_checks_to_execute(
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
# Bypass check loading for providers that use Trivy directly
if provider in ("iac", "image"):
# Bypass check loading for IAC provider since it uses Trivy directly
if provider == "iac":
return set()
# Local subsets

View File

@@ -384,12 +384,10 @@ class Finding(BaseModel):
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = "image"
output_data["account_name"] = "image"
image_name = getattr(check_output, "resource_name", "")
image_sha = getattr(check_output, "image_sha", "")
output_data["resource_name"] = image_name
output_data["resource_uid"] = (
f"{image_name}:{image_sha}" if image_sha else image_name
output_data["resource_name"] = getattr(
check_output, "resource_name", ""
)
output_data["resource_uid"] = getattr(check_output, "resource_id", "")
output_data["region"] = getattr(check_output, "region", "container")
output_data["package_name"] = getattr(check_output, "package_name", "")
output_data["installed_version"] = getattr(

View File

@@ -930,56 +930,6 @@ class HTML(Output):
)
return ""
@staticmethod
def get_image_assessment_summary(provider: Provider) -> str:
"""
get_image_assessment_summary gets the HTML assessment summary for the Image provider
Args:
provider (Provider): the Image provider object
Returns:
str: the HTML assessment summary
"""
try:
if provider.registry:
target_info = f"<b>Registry URL:</b> {provider.registry}"
else:
target_info = f'<b>Images:</b> {", ".join(provider.images)}'
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Image Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
{target_info}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Image Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Image authentication method:</b> {provider.auth_method}
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_llm_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -27,7 +27,6 @@ from prowler.lib.scan.exceptions.exceptions import (
from prowler.providers.common.models import Audit_Metadata, ProviderOutputOptions
from prowler.providers.common.provider import Provider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
class Scan:
@@ -93,10 +92,10 @@ class Scan:
except ValueError:
raise ScanInvalidStatusError(f"Invalid status provided: {s}.")
# Special setup for IaC/Image providers - override inputs to work with traditional flow
if provider.type in ("iac", "image"):
# These providers don't use traditional Prowler checks, so clear all input parameters
# to avoid validation errors and let them flow through the normal logic
# Special setup for IaC provider - override inputs to work with traditional flow
if provider.type == "iac":
# IaC doesn't use traditional Prowler checks, so clear all input parameters
# to avoid validation errors and let it flow through the normal logic
checks = None
services = None
excluded_checks = None
@@ -161,8 +160,8 @@ class Scan:
)
# Load checks to execute
if provider.type in ("iac", "image"):
self._checks_to_execute = [f"{provider.type}_scan"]
if provider.type == "iac":
self._checks_to_execute = ["iac_scan"] # Dummy check name for IaC
else:
self._checks_to_execute = sorted(
load_checks_to_execute(
@@ -201,8 +200,8 @@ class Scan:
self._number_of_checks_to_execute = len(self._checks_to_execute)
# Set up service-based checks tracking
if provider.type in ("iac", "image"):
service_checks_to_execute = {provider.type: set([f"{provider.type}_scan"])}
if provider.type == "iac":
service_checks_to_execute = {"iac": set(["iac_scan"])}
else:
service_checks_to_execute = get_service_checks_to_execute(
self._checks_to_execute
@@ -347,75 +346,6 @@ class Scan:
self._duration = int((end_time - start_time).total_seconds())
return
# Special handling for Image provider
elif self._provider.type == "image":
if isinstance(self._provider, ImageProvider):
logger.info("Running Image scan with Trivy...")
total_images = len(self._provider.images)
images_completed = 0
for image_name, image_findings in self._provider.scan_per_image():
findings = []
for report in image_findings:
finding_uid = f"{report.check_metadata.CheckID}-{report.resource_name}-{report.resource_id}"
status_enum = (
Status.FAIL if report.status == "FAIL" else Status.PASS
)
if report.muted:
status_enum = Status.MUTED
image_sha = getattr(report, "image_sha", "")
resource_uid = (
f"{image_name}:{image_sha}" if image_sha else image_name
)
finding = Finding(
auth_method="Registry",
timestamp=datetime.datetime.now(timezone.utc),
account_uid=getattr(self._provider, "registry", None)
or "image",
account_name="Container Registry",
metadata=report.check_metadata,
uid=finding_uid,
status=status_enum,
status_extended=report.status_extended,
muted=report.muted,
resource_uid=resource_uid,
resource_metadata=report.resource,
resource_name=image_name,
resource_details=report.resource_details,
resource_tags={},
region=report.region,
compliance={},
raw=report.resource,
)
findings.append(finding)
# Filter the findings by the status
if self._status:
findings = [f for f in findings if f.status in self._status]
images_completed += 1
progress = (
images_completed / total_images * 100
if total_images > 0
else 100.0
)
yield (progress, findings)
# Update progress
self._number_of_checks_completed = 1
self._number_of_checks_to_execute = 1
# Calculate duration
end_time = datetime.datetime.now()
self._duration = int((end_time - start_time).total_seconds())
return
for check_name in checks_to_execute:
try:
# Recover service from check name

View File

@@ -50,32 +50,12 @@ class ImageBaseException(ProwlerException):
"message": "Invalid image config scanner type.",
"remediation": "Use valid image config scanners: misconfig, secret.",
},
(11013, "ImageRegistryAuthError"): {
"message": "Registry authentication failed.",
"remediation": "Check REGISTRY_USERNAME/REGISTRY_PASSWORD or REGISTRY_TOKEN environment variables.",
},
(11014, "ImageRegistryCatalogError"): {
"message": "Registry does not support catalog listing.",
"remediation": "Use --image or --image-list instead of --registry.",
},
(11015, "ImageRegistryNetworkError"): {
"message": "Network error communicating with registry.",
"remediation": "Check registry URL and network connectivity.",
},
(11016, "ImageMaxImagesExceededError"): {
"message": "Discovered images exceed --max-images limit.",
"remediation": "Use --image-filter or --tag-filter to narrow results, or increase --max-images.",
},
(11017, "ImageInvalidFilterError"): {
"message": "Invalid regex filter pattern.",
"remediation": "Check the regex syntax for --image-filter or --tag-filter.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
error_info = self.IMAGE_ERROR_CODES.get((code, self.__class__.__name__))
if error_info and message:
error_info = {**error_info, "message": message}
if message:
error_info["message"] = message
super().__init__(
code,
source="Image",
@@ -182,48 +162,3 @@ class ImageInvalidConfigScannerError(ImageBaseException):
super().__init__(
11010, file=file, original_exception=original_exception, message=message
)
class ImageRegistryAuthError(ImageBaseException):
"""Exception raised when registry authentication fails."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11013, file=file, original_exception=original_exception, message=message
)
class ImageRegistryCatalogError(ImageBaseException):
"""Exception raised when registry does not support catalog listing."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11014, file=file, original_exception=original_exception, message=message
)
class ImageRegistryNetworkError(ImageBaseException):
"""Exception raised when a network error occurs communicating with a registry."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11015, file=file, original_exception=original_exception, message=message
)
class ImageMaxImagesExceededError(ImageBaseException):
"""Exception raised when discovered images exceed --max-images limit."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11016, file=file, original_exception=original_exception, message=message
)
class ImageInvalidFilterError(ImageBaseException):
"""Exception raised when an invalid regex filter pattern is provided."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11017, file=file, original_exception=original_exception, message=message
)

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import json
import os
import re
import subprocess
import sys
@@ -22,18 +21,13 @@ from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageFindingProcessingError,
ImageInvalidConfigScannerError,
ImageInvalidFilterError,
ImageInvalidNameError,
ImageInvalidScannerError,
ImageInvalidSeverityError,
ImageInvalidTimeoutError,
ImageListFileNotFoundError,
ImageListFileReadError,
ImageMaxImagesExceededError,
ImageNoImagesProvidedError,
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
@@ -42,8 +36,6 @@ from prowler.providers.image.lib.arguments.arguments import (
SCANNERS_CHOICES,
SEVERITY_CHOICES,
)
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
from prowler.providers.image.lib.registry.factory import create_registry_adapter
class ImageProvider(Provider):
@@ -74,15 +66,6 @@ class ImageProvider(Provider):
config_path: str | None = None,
config_content: dict | None = None,
fixer_config: dict | None = None,
registry_username: str | None = None,
registry_password: str | None = None,
registry_token: str | None = None,
registry: str | None = None,
image_filter: str | None = None,
tag_filter: str | None = None,
max_images: int = 0,
registry_insecure: bool = False,
registry_list_images: bool = False,
):
logger.info("Instantiating Image Provider...")
@@ -99,53 +82,7 @@ class ImageProvider(Provider):
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
self._listing_only = False
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
self.registry_username = registry_username or os.environ.get(
"REGISTRY_USERNAME"
)
self.registry_password = registry_password or os.environ.get(
"REGISTRY_PASSWORD"
)
self.registry_token = registry_token or os.environ.get("REGISTRY_TOKEN")
if self.registry_username and self.registry_password:
self._auth_method = "Docker login"
logger.info("Using docker login for registry authentication")
elif self.registry_token:
self._auth_method = "Registry token"
logger.info("Using registry token for authentication")
else:
self._auth_method = "No auth"
# Registry scan mode
self.registry = registry
self.image_filter = image_filter
self.tag_filter = tag_filter
self.max_images = max_images
self.registry_insecure = registry_insecure
self.registry_list_images = registry_list_images
# Compile regex filters
self._image_filter_re = None
self._tag_filter_re = None
if self.image_filter:
try:
self._image_filter_re = re.compile(self.image_filter)
except re.error as exc:
raise ImageInvalidFilterError(
file=__file__,
message=f"Invalid --image-filter regex '{self.image_filter}': {exc}",
)
if self.tag_filter:
try:
self._tag_filter_re = re.compile(self.tag_filter)
except re.error as exc:
raise ImageInvalidFilterError(
file=__file__,
message=f"Invalid --tag-filter regex '{self.tag_filter}': {exc}",
)
self._auth_method = "No auth"
self._validate_inputs()
@@ -153,12 +90,6 @@ class ImageProvider(Provider):
if image_list_file:
self._load_images_from_file(image_list_file)
# Registry scan mode: enumerate images from registry
if self.registry:
self._enumerate_registry()
if self._listing_only:
return
for image in self.images:
self._validate_image_name(image)
@@ -314,72 +245,37 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
@staticmethod
def _extract_registry(image: str) -> str | None:
"""Extract registry hostname from an image reference.
Returns None for Docker Hub images (no registry prefix).
"""
parts = image.split("/")
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
return parts[0]
return None
@staticmethod
def _is_registry_url(image_uid: str) -> bool:
"""Determine whether an image UID is a registry URL (namespace only).
A registry URL like ``docker.io/andoniaf`` has a registry host but
the remaining part contains no ``/`` (no repo) and no ``:`` (no tag).
"""
registry_host = ImageProvider._extract_registry(image_uid)
if not registry_host:
return False
repo_and_tag = image_uid[len(registry_host) + 1 :]
return "/" not in repo_and_tag and ":" not in repo_and_tag
def cleanup(self) -> None:
"""Clean up any resources after scanning."""
def _process_finding(
self,
finding: dict,
image: str,
trivy_target: str,
image_sha: str = "",
self, finding: dict, image_name: str, finding_type: str
) -> CheckReportImage:
"""
Process a single finding and create a CheckReportImage object.
Args:
finding: The finding object from Trivy output
image: The clean container image name (e.g., "alpine:3.18")
trivy_target: The Trivy target string (e.g., "alpine:3.18 (alpine 3.18.0)")
image_sha: Short SHA from Trivy Metadata.ImageID for resource uniqueness
image_name: The container image name being scanned
finding_type: The type of finding (Vulnerability, Secret, etc.)
Returns:
CheckReportImage: The processed check report
"""
try:
# Determine finding ID and category based on type
# Determine finding ID based on type
if "VulnerabilityID" in finding:
finding_id = finding["VulnerabilityID"]
finding_description = finding.get(
"Description", finding.get("Title", "")
)
finding_status = "FAIL"
finding_categories = ["vulnerability"]
elif "RuleID" in finding:
# Secret finding
finding_id = finding["RuleID"]
finding_description = finding.get("Title", "Secret detected")
finding_status = "FAIL"
finding_categories = ["secrets"]
else:
finding_id = finding.get("ID", "UNKNOWN")
finding_description = finding.get("Description", "")
finding_status = finding.get("Status", "FAIL")
finding_categories = []
# Build remediation text for vulnerabilities
remediation_text = ""
@@ -398,7 +294,7 @@ class ImageProvider(Provider):
"CheckID": finding_id,
"CheckTitle": finding.get("Title", finding_id),
"CheckType": ["Container Image Security"],
"ServiceName": "container-image",
"ServiceName": finding_type,
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": trivy_severity,
@@ -408,7 +304,7 @@ class ImageProvider(Provider):
"Risk": finding.get(
"Description", "Vulnerability detected in container image"
),
"RelatedUrl": "",
"RelatedUrl": finding.get("PrimaryURL", ""),
"Remediation": {
"Code": {
"NativeIaC": "",
@@ -421,7 +317,7 @@ class ImageProvider(Provider):
"Url": finding.get("PrimaryURL", ""),
},
},
"Categories": finding_categories,
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
@@ -431,13 +327,11 @@ class ImageProvider(Provider):
metadata = json.dumps(metadata_dict)
report = CheckReportImage(
metadata=metadata, finding=finding, image_name=image
metadata=metadata, finding=finding, image_name=image_name
)
report.status = finding_status
report.status_extended = self._build_status_extended(finding)
report.region = self.region
report.image_sha = image_sha
report.resource_details = trivy_target
return report
except Exception as error:
@@ -474,36 +368,10 @@ class ImageProvider(Provider):
def run(self) -> list[CheckReportImage]:
"""Execute the container image scan."""
try:
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
finally:
self.cleanup()
def scan_per_image(
self,
) -> Generator[tuple[str, list[CheckReportImage]], None, None]:
"""Scan images one by one, yielding (image_name, findings) per image.
Unlike run() which returns all findings at once, this method yields
after each image completes, enabling progress tracking.
"""
try:
for image in self.images:
try:
image_findings = []
for batch in self._scan_single_image(image):
image_findings.extend(batch)
yield (image, image_findings)
except (ImageScanError, ImageTrivyBinaryNotFoundError):
raise
except Exception as error:
logger.error(f"Error scanning image {image}: {error}")
yield (image, [])
finally:
self.cleanup()
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
def run_scan(self) -> Generator[list[CheckReportImage], None, None]:
"""
@@ -586,19 +454,6 @@ class ImageProvider(Provider):
logger.info(f"No findings for image: {image}")
return
# Extract image digest for resource uniqueness
trivy_metadata = output.get("Metadata", {})
image_id = trivy_metadata.get("ImageID", "")
if not image_id:
repo_digests = trivy_metadata.get("RepoDigests", [])
if repo_digests:
image_id = (
repo_digests[0].split("@")[-1]
if "@" in repo_digests[0]
else ""
)
short_sha = image_id.replace("sha256:", "")[:12] if image_id else ""
except json.JSONDecodeError as error:
logger.error(f"Failed to parse Trivy output for {image}: {error}")
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
@@ -609,12 +464,11 @@ class ImageProvider(Provider):
for result in results:
target = result.get("Target", image)
result_type = result.get("Type", "unknown")
# Process Vulnerabilities
for vuln in result.get("Vulnerabilities", []):
report = self._process_finding(
vuln, image, target, image_sha=short_sha
)
report = self._process_finding(vuln, target, result_type)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -622,9 +476,7 @@ class ImageProvider(Provider):
# Process Secrets
for secret in result.get("Secrets", []):
report = self._process_finding(
secret, image, target, image_sha=short_sha
)
report = self._process_finding(secret, target, "secret")
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -633,7 +485,7 @@ class ImageProvider(Provider):
# Process Misconfigurations (from Dockerfile)
for misconfig in result.get("Misconfigurations", []):
report = self._process_finding(
misconfig, image, target, image_sha=short_sha
misconfig, target, "misconfiguration"
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
@@ -655,19 +507,8 @@ class ImageProvider(Provider):
)
logger.error(f"Error scanning image {image}: {error}")
def _build_trivy_env(self) -> dict:
"""Build environment variables for Trivy, injecting registry credentials."""
env = dict(os.environ)
if self.registry_username and self.registry_password:
env["TRIVY_USERNAME"] = self.registry_username
env["TRIVY_PASSWORD"] = self.registry_password
elif self.registry_token:
env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
return env
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
"""Execute Trivy command with optional progress bar."""
env = self._build_trivy_env()
try:
if sys.stdout.isatty():
with alive_bar(
@@ -682,7 +523,6 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
env=env,
)
bar.title = f"-> Scan completed for {image}"
return process
@@ -692,13 +532,12 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
env=env,
)
logger.info(f"Scan completed for {image}")
return process
except (AttributeError, OSError):
logger.info(f"Scanning {image}...")
return subprocess.run(command, capture_output=True, text=True, env=env)
return subprocess.run(command, capture_output=True, text=True)
def _log_trivy_stderr(self, stderr: str) -> None:
"""Parse and log Trivy's stderr output."""
@@ -757,105 +596,6 @@ class ImageProvider(Provider):
return error_msg
def _enumerate_registry(self) -> None:
"""Enumerate images from a registry using the appropriate adapter."""
verify_ssl = not self.registry_insecure
adapter = create_registry_adapter(
registry_url=self.registry,
username=self.registry_username,
password=self.registry_password,
token=self.registry_token,
verify_ssl=verify_ssl,
)
repositories = adapter.list_repositories()
logger.info(
f"Discovered {len(repositories)} repositories from registry {self.registry}"
)
# Apply image filter
if self._image_filter_re:
repositories = [r for r in repositories if self._image_filter_re.search(r)]
logger.info(
f"{len(repositories)} repositories match --image-filter '{self.image_filter}'"
)
if not repositories:
logger.warning(
f"No repositories found in registry {self.registry} (after filtering)"
)
return
# Determine if this is a Docker Hub adapter (for image reference format)
is_dockerhub = isinstance(adapter, DockerHubAdapter)
discovered_images = []
repos_tags: dict[str, list[str]] = {}
for repo in repositories:
tags = adapter.list_tags(repo)
# Apply tag filter
if self._tag_filter_re:
tags = [t for t in tags if self._tag_filter_re.search(t)]
if tags:
repos_tags[repo] = tags
for tag in tags:
if is_dockerhub:
# Docker Hub images don't need a host prefix
image_ref = f"{repo}:{tag}"
else:
# OCI registries need the full host/repo:tag reference
registry_host = self.registry.rstrip("/")
for prefix in ("https://", "http://"):
if registry_host.startswith(prefix):
registry_host = registry_host[len(prefix) :]
break
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
# Registry list mode: print listing and return early
if self.registry_list_images:
self._print_registry_listing(repos_tags, len(discovered_images))
self._listing_only = True
return
# Check max-images limit
if self.max_images and len(discovered_images) > self.max_images:
raise ImageMaxImagesExceededError(
file=__file__,
message=f"Discovered {len(discovered_images)} images, exceeding --max-images {self.max_images}. Use --image-filter or --tag-filter to narrow results.",
)
# Deduplicate with explicit images
existing = set(self.images)
for img in discovered_images:
if img not in existing:
self.images.append(img)
existing.add(img)
logger.info(
f"Discovered {len(discovered_images)} images from registry {self.registry} "
f"({len(repositories)} repositories). Total images to scan: {len(self.images)}"
)
def _print_registry_listing(
self, repos_tags: dict[str, list[str]], total_images: int
) -> None:
"""Print a structured listing of registry repositories and tags."""
num_repos = len(repos_tags)
print(
f"\n{Style.BRIGHT}Registry:{Style.RESET_ALL} "
f"{Fore.CYAN}{self.registry}{Style.RESET_ALL} "
f"({num_repos} {'repository' if num_repos == 1 else 'repositories'}, "
f"{total_images} {'image' if total_images == 1 else 'images'})\n"
)
for repo, tags in repos_tags.items():
print(f" {Fore.YELLOW}{repo}{Style.RESET_ALL} " f"({len(tags)} tags)")
print(f" {', '.join(tags)}")
print()
def print_credentials(self) -> None:
"""Print scan configuration."""
report_title = f"{Style.BRIGHT}Scanning container images:{Style.RESET_ALL}"
@@ -888,23 +628,6 @@ class ImageProvider(Provider):
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
report_lines.append(
f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
)
if self.registry:
report_lines.append(
f"Registry: {Fore.YELLOW}{self.registry}{Style.RESET_ALL}"
)
if self.image_filter:
report_lines.append(
f"Image filter: {Fore.YELLOW}{self.image_filter}{Style.RESET_ALL}"
)
if self.tag_filter:
report_lines.append(
f"Tag filter: {Fore.YELLOW}{self.tag_filter}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
@@ -912,29 +635,14 @@ class ImageProvider(Provider):
image: str | None = None,
raise_on_exception: bool = True,
provider_id: str | None = None,
registry_username: str | None = None,
registry_password: str | None = None,
registry_token: str | None = None,
) -> "Connection":
"""
Test connection to container registry by verifying image accessibility.
Handles two cases:
- Image reference (e.g. ``alpine:3.18``, ``ghcr.io/user/repo:tag``):
verifies the specific tag exists.
- Registry URL (e.g. ``docker.io/namespace``, ``ghcr.io/org``):
verifies we can list repositories in that namespace.
Uses registry HTTP APIs directly instead of Trivy to avoid false
failures caused by Trivy DB download issues.
Test connection to container registry by attempting to inspect an image.
Args:
image: Container image or registry URL to test
image: Container image to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
registry_username: Registry username for basic auth
registry_password: Registry password for basic auth
registry_token: Registry token for token-based auth
Returns:
Connection: Connection object with success status
@@ -946,65 +654,49 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
if ImageProvider._is_registry_url(image):
# Registry enumeration mode — test by listing repositories
adapter = create_registry_adapter(
registry_url=image,
username=registry_username,
password=registry_password,
token=registry_token,
)
adapter.list_repositories()
# Test by running trivy with --skip-update to just test image access
process = subprocess.run(
[
"trivy",
"image",
"--skip-db-update",
"--download-db-only=false",
image,
],
capture_output=True,
text=True,
timeout=60,
)
if process.returncode == 0:
return Connection(is_connected=True)
# Image reference mode — verify the specific tag exists
registry_host = ImageProvider._extract_registry(image)
repo_and_tag = image[len(registry_host) + 1 :] if registry_host else image
if ":" in repo_and_tag:
repository, tag = repo_and_tag.rsplit(":", 1)
else:
repository = repo_and_tag
tag = "latest"
error_msg = process.stderr or "Unknown error"
if "401" in error_msg or "unauthorized" in error_msg.lower():
return Connection(
is_connected=False,
error="Authentication failed. Check registry credentials.",
)
elif "not found" in error_msg.lower() or "404" in error_msg:
return Connection(
is_connected=False,
error="Image not found in registry.",
)
else:
return Connection(
is_connected=False,
error=f"Failed to access image: {error_msg[:200]}",
)
is_dockerhub = not registry_host or registry_host in (
"docker.io",
"registry-1.docker.io",
)
# Docker Hub official images use "library/" prefix
if is_dockerhub and "/" not in repository:
repository = f"library/{repository}"
if is_dockerhub:
registry_url = f"docker.io/{repository.split('/')[0]}"
else:
registry_url = registry_host
adapter = create_registry_adapter(
registry_url=registry_url,
username=registry_username,
password=registry_password,
token=registry_token,
)
tags = adapter.list_tags(repository)
if tag not in tags:
return Connection(
is_connected=False,
error=f"Tag '{tag}' not found for image '{image}'.",
)
return Connection(is_connected=True)
except ImageRegistryAuthError:
except subprocess.TimeoutExpired:
return Connection(
is_connected=False,
error="Authentication failed. Check registry credentials.",
error="Connection timed out",
)
except (ImageRegistryNetworkError, ImageRegistryCatalogError) as exc:
except FileNotFoundError:
return Connection(
is_connected=False,
error=f"Failed to access image: {str(exc)[:200]}",
error="Trivy binary not found. Please install Trivy.",
)
except Exception as error:
if raise_on_exception:

View File

@@ -88,96 +88,16 @@ def init_parser(self):
help="Trivy scan timeout. Default: 5m. Examples: 10m, 1h",
)
# Registry Scan Mode
registry_group = image_parser.add_argument_group("Registry Scan Mode")
registry_group.add_argument(
"--registry",
dest="registry",
default=None,
help="Registry URL to enumerate and scan all images. Examples: myregistry.io, docker.io/myorg, 123456789.dkr.ecr.us-east-1.amazonaws.com",
)
registry_group.add_argument(
"--image-filter",
dest="image_filter",
default=None,
help="Regex to filter repository names during registry enumeration (re.search). Example: '^prod/.*'",
)
registry_group.add_argument(
"--tag-filter",
dest="tag_filter",
default=None,
help=r"Regex to filter tags during registry enumeration (re.search). Example: '^(latest|v\d+\.\d+\.\d+)$'",
)
registry_group.add_argument(
"--max-images",
dest="max_images",
type=int,
default=0,
help="Maximum number of images to scan from registry. 0 = unlimited. Aborts if exceeded.",
)
registry_group.add_argument(
"--registry-insecure",
dest="registry_insecure",
action="store_true",
default=False,
help="Skip TLS verification for registry connections (for self-signed certificates).",
)
registry_group.add_argument(
"--registry-list",
dest="registry_list_images",
action="store_true",
default=False,
help="List all repositories and tags from the registry, then exit without scanning. Useful for discovering available images before building --image-filter or --tag-filter.",
)
def validate_arguments(arguments):
"""Validate Image provider arguments."""
images = getattr(arguments, "images", [])
image_list_file = getattr(arguments, "image_list_file", None)
registry = getattr(arguments, "registry", None)
image_filter = getattr(arguments, "image_filter", None)
tag_filter = getattr(arguments, "tag_filter", None)
max_images = getattr(arguments, "max_images", 0)
registry_insecure = getattr(arguments, "registry_insecure", False)
registry_list_images = getattr(arguments, "registry_list_images", False)
if registry_list_images and not registry:
return (False, "--registry-list requires --registry.")
if not images and not image_list_file and not registry:
if not images and not image_list_file:
return (
False,
"At least one image source must be specified using --image (-I), --image-list, or --registry.",
"At least one image must be specified using --image (-I) or --image-list.",
)
# Registry-only flags require --registry
if not registry:
if image_filter:
return (False, "--image-filter requires --registry.")
if tag_filter:
return (False, "--tag-filter requires --registry.")
if max_images:
return (False, "--max-images requires --registry.")
if registry_insecure:
return (False, "--registry-insecure requires --registry.")
# Docker Hub namespace validation
if registry:
url = registry.rstrip("/")
for prefix in ("https://", "http://"):
if url.startswith(prefix):
url = url[len(prefix) :]
break
stripped = url
for prefix in ("registry-1.docker.io", "docker.io"):
if stripped.startswith(prefix):
stripped = stripped[len(prefix) :].lstrip("/")
if not stripped:
return (
False,
"Docker Hub requires a namespace. Use --registry docker.io/{org_or_user}.",
)
break
return (True, "")

View File

@@ -1,146 +0,0 @@
"""Registry adapter abstract base class."""
from __future__ import annotations
import re
import time
from abc import ABC, abstractmethod
from urllib.parse import urlparse
import requests
from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
_MAX_RETRIES = 3
_BACKOFF_BASE = 1
_USER_AGENT = f"Prowler/{prowler_version} (registry-adapter)"
class RegistryAdapter(ABC):
"""Abstract base class for registry adapters."""
def __init__(
self,
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
self.registry_url = registry_url
self.username = username
self._password = password
self._token = token
self.verify_ssl = verify_ssl
@property
def password(self) -> str | None:
return self._password
@property
def token(self) -> str | None:
return self._token
def __getstate__(self) -> dict:
state = self.__dict__.copy()
state["_password"] = "***" if state.get("_password") else None
state["_token"] = "***" if state.get("_token") else None
return state
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"registry_url={self.registry_url!r}, "
f"username={self.username!r}, "
f"password={'<redacted>' if self._password else None}, "
f"token={'<redacted>' if self._token else None})"
)
@abstractmethod
def list_repositories(self) -> list[str]:
"""Enumerate all repository names in the registry."""
...
@abstractmethod
def list_tags(self, repository: str) -> list[str]:
"""Enumerate all tags for a repository."""
...
def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
context_label = kwargs.pop("context_label", None) or self.registry_url
kwargs.setdefault("timeout", 30)
kwargs.setdefault("verify", self.verify_ssl)
headers = kwargs.get("headers", {})
headers.setdefault("User-Agent", _USER_AGENT)
kwargs["headers"] = headers
last_exception = None
last_status = None
last_body = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
resp = requests.request(method, url, **kwargs)
if resp.status_code == 429:
last_status = 429
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Rate limited by {context_label}, retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES})"
)
time.sleep(wait)
continue
if resp.status_code >= 500:
last_status = resp.status_code
last_body = (resp.text or "")[:500]
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Server error from {context_label} (HTTP {resp.status_code}), "
f"retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES}): {last_body}"
)
time.sleep(wait)
continue
return resp
except requests.exceptions.ConnectionError as exc:
last_exception = exc
if attempt < _MAX_RETRIES:
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Connection error to {context_label}, retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES})"
)
time.sleep(wait)
continue
except requests.exceptions.Timeout as exc:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Connection timed out to {context_label}.",
original_exception=exc,
)
if last_status == 429:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Rate limited by {context_label} after {_MAX_RETRIES} attempts.",
)
if last_status is not None and last_status >= 500:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Server error from {context_label} (HTTP {last_status}) after {_MAX_RETRIES} attempts: {last_body}",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Failed to connect to {context_label} after {_MAX_RETRIES} attempts.",
original_exception=last_exception,
)
@staticmethod
def _next_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if match:
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
return f"{parsed.scheme}://{parsed.netloc}{url}"
return url
return None

View File

@@ -1,221 +0,0 @@
"""Docker Hub registry adapter."""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.base import RegistryAdapter
if TYPE_CHECKING:
import requests
_HUB_API = "https://hub.docker.com"
_REGISTRY_HOST = "https://registry-1.docker.io"
_AUTH_URL = "https://auth.docker.io/token"
class DockerHubAdapter(RegistryAdapter):
"""Adapter for Docker Hub using the Hub REST API + OCI tag listing."""
def __init__(
self,
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
if not verify_ssl:
logger.warning(
"Docker Hub always uses TLS verification; --registry-insecure is ignored for Docker Hub registries."
)
super().__init__(registry_url, username, password, token, verify_ssl=True)
self.namespace = self._extract_namespace(registry_url)
self._hub_jwt: str | None = None
self._registry_tokens: dict[str, str] = {}
@staticmethod
def _extract_namespace(registry_url: str) -> str:
url = registry_url.rstrip("/")
for prefix in (
"https://registry-1.docker.io",
"http://registry-1.docker.io",
"https://docker.io",
"http://docker.io",
"registry-1.docker.io",
"docker.io",
"https://",
"http://",
):
if url.startswith(prefix):
url = url[len(prefix) :]
break
url = url.lstrip("/")
parts = url.split("/")
namespace = parts[0] if parts and parts[0] else ""
return namespace
def list_repositories(self) -> list[str]:
if not self.namespace:
raise ImageRegistryCatalogError(
file=__file__,
message="Docker Hub requires a namespace. Use --registry docker.io/{org_or_user}.",
)
self._hub_login()
repositories: list[str] = []
if self._hub_jwt:
url = f"{_HUB_API}/v2/namespaces/{self.namespace}/repositories"
else:
url = f"{_HUB_API}/v2/repositories/{self.namespace}/"
params: dict = {"page_size": 100}
while url:
resp = self._hub_request("GET", url, params=params)
self._check_hub_response(resp, "repository listing")
data = resp.json()
for repo in data.get("results", []):
name = repo.get("name", "")
if name:
repositories.append(f"{self.namespace}/{name}")
url = data.get("next")
params = {}
return repositories
def list_tags(self, repository: str) -> list[str]:
token = self._get_registry_token(repository)
tags: list[str] = []
url = f"{_REGISTRY_HOST}/v2/{repository}/tags/list"
params: dict = {"n": 100}
while url:
resp = self._registry_request("GET", url, token, params=params)
if resp.status_code in (401, 403):
raise ImageRegistryAuthError(
file=__file__,
message=f"Authentication failed for tag listing of {repository} on Docker Hub. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
if resp.status_code != 200:
logger.warning(
f"Failed to list tags for {repository} (HTTP {resp.status_code}): {resp.text[:200]}"
)
break
data = resp.json()
tags.extend(data.get("tags", []) or [])
url = self._next_tag_page_url(resp)
params = {}
return tags
def _hub_login(self) -> None:
if self._hub_jwt:
return
if not self.username or not self.password:
return
logger.debug(f"Docker Hub login attempt for username: {self.username!r}")
resp = self._request_with_retry(
"POST",
f"{_HUB_API}/v2/users/login",
json={"username": self.username, "password": self.password},
context_label="Docker Hub",
)
if resp.status_code != 200:
body_preview = resp.text[:200] if resp.text else "(empty body)"
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Docker Hub login failed (HTTP {resp.status_code}). "
f"Check REGISTRY_USERNAME and REGISTRY_PASSWORD. "
f"Response: {body_preview}"
),
)
self._hub_jwt = resp.json().get("token")
if not self._hub_jwt:
raise ImageRegistryAuthError(
file=__file__,
message="Docker Hub login returned an empty JWT token. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
def _get_registry_token(self, repository: str) -> str:
if repository in self._registry_tokens:
return self._registry_tokens[repository]
params = {
"service": "registry.docker.io",
"scope": f"repository:{repository}:pull",
}
auth = None
if self.username and self.password:
auth = (self.username, self.password)
resp = self._request_with_retry(
"GET",
_AUTH_URL,
params=params,
auth=auth,
context_label="Docker Hub",
)
if resp.status_code != 200:
raise ImageRegistryAuthError(
file=__file__,
message=f"Failed to obtain Docker Hub registry token for {repository} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
token = resp.json().get("token", "")
if not token:
raise ImageRegistryAuthError(
file=__file__,
message=f"Docker Hub registry token endpoint returned an empty token for {repository}. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
self._registry_tokens[repository] = token
return token
def _hub_request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
if self._hub_jwt:
headers["Authorization"] = f"Bearer {self._hub_jwt}"
kwargs["headers"] = headers
return self._request_with_retry(
method, url, context_label="Docker Hub", **kwargs
)
def _registry_request(
self, method: str, url: str, token: str, **kwargs
) -> requests.Response:
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {token}"
kwargs["headers"] = headers
return self._request_with_retry(
method, url, context_label="Docker Hub", **kwargs
)
def _check_hub_response(self, resp: requests.Response, context: str) -> None:
if resp.status_code == 200:
return
if resp.status_code in (401, 403):
raise ImageRegistryAuthError(
file=__file__,
message=f"Authentication failed for {context} on Docker Hub (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD environment variables.",
)
if resp.status_code == 404:
raise ImageRegistryCatalogError(
file=__file__,
message=f"Namespace '{self.namespace}' not found on Docker Hub. Check the namespace in --registry docker.io/{{namespace}}.",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Unexpected error during {context} on Docker Hub (HTTP {resp.status_code}): {resp.text[:200]}",
)
@staticmethod
def _next_tag_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if match:
next_url = match.group(1)
if next_url.startswith("/"):
return f"{_REGISTRY_HOST}{next_url}"
return next_url
return None

View File

@@ -1,40 +0,0 @@
"""Factory for auto-detecting registry type and returning the appropriate adapter."""
from __future__ import annotations
import re
from prowler.providers.image.lib.registry.base import RegistryAdapter
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
_DOCKER_HUB_PATTERN = re.compile(
r"^(https?://)?(docker\.io|registry-1\.docker\.io)(/|$)", re.IGNORECASE
)
def create_registry_adapter(
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> RegistryAdapter:
"""Auto-detect registry type from URL and return the appropriate adapter."""
if _DOCKER_HUB_PATTERN.search(registry_url):
return DockerHubAdapter(
registry_url=registry_url,
username=username,
password=password,
token=token,
verify_ssl=verify_ssl,
)
# ECR and other non-Docker-Hub registries implement the OCI Distribution Spec,
# so they are handled by the generic OCI adapter.
return OciRegistryAdapter(
registry_url=registry_url,
username=username,
password=password,
token=token,
verify_ssl=verify_ssl,
)

View File

@@ -1,228 +0,0 @@
"""Generic OCI Distribution Spec registry adapter."""
from __future__ import annotations
import base64
import ipaddress
import re
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.base import RegistryAdapter
if TYPE_CHECKING:
import requests
class OciRegistryAdapter(RegistryAdapter):
"""Adapter for registries implementing OCI Distribution Spec."""
def __init__(
self,
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
super().__init__(registry_url, username, password, token, verify_ssl)
self._base_url = self._normalise_url(registry_url)
self._bearer_token: str | None = None
self._basic_auth_verified = False
@staticmethod
def _normalise_url(url: str) -> str:
url = url.rstrip("/")
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
return url
def list_repositories(self) -> list[str]:
self._ensure_auth()
repositories: list[str] = []
url = f"{self._base_url}/v2/_catalog"
params: dict = {"n": 200}
while url:
resp = self._authed_request("GET", url, params=params)
if resp.status_code == 404:
raise ImageRegistryCatalogError(
file=__file__,
message=f"Registry at {self.registry_url} does not support catalog listing (/_catalog returned 404). Use --image or --image-list instead.",
)
self._check_response(resp, "catalog listing")
data = resp.json()
repositories.extend(data.get("repositories", []))
url = self._next_page_url(resp)
params = {}
return repositories
def list_tags(self, repository: str) -> list[str]:
self._ensure_auth(repository=repository)
tags: list[str] = []
url = f"{self._base_url}/v2/{repository}/tags/list"
params: dict = {"n": 200}
while url:
resp = self._authed_request("GET", url, params=params)
self._check_response(resp, f"tag listing for {repository}")
data = resp.json()
tags.extend(data.get("tags", []) or [])
url = self._next_page_url(resp)
params = {}
return tags
def _ensure_auth(self, repository: str | None = None) -> None:
if self._bearer_token:
return
if self._basic_auth_verified:
return
if self.token:
self._bearer_token = self.token
return
ping_url = f"{self._base_url}/v2/"
resp = self._request_with_retry("GET", ping_url)
if resp.status_code == 200:
return
if resp.status_code == 401:
www_auth = resp.headers.get("Www-Authenticate", "")
if not www_auth.lower().startswith("bearer"):
# Basic auth challenge (e.g., AWS ECR)
if self.username and self.password:
self._basic_auth_verified = True
return
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Registry {self.registry_url} requires authentication "
f"but no credentials provided. "
f"Set REGISTRY_USERNAME and REGISTRY_PASSWORD."
),
)
# Bearer token exchange (standard OCI flow)
self._bearer_token = self._obtain_bearer_token(www_auth, repository)
return
if resp.status_code == 403:
raise ImageRegistryAuthError(
file=__file__,
message=f"Access denied to registry {self.registry_url} (HTTP 403). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Unexpected HTTP {resp.status_code} from registry {self.registry_url} during auth check.",
)
def _obtain_bearer_token(
self, www_authenticate: str, repository: str | None = None
) -> str:
match = re.search(r'realm="([^"]+)"', www_authenticate)
if not match:
raise ImageRegistryAuthError(
file=__file__,
message=f"Cannot parse token endpoint from registry {self.registry_url}. Www-Authenticate: {www_authenticate[:200]}",
)
realm = match.group(1)
self._validate_realm_url(realm)
params: dict = {}
service_match = re.search(r'service="([^"]+)"', www_authenticate)
if service_match:
params["service"] = service_match.group(1)
scope_match = re.search(r'scope="([^"]+)"', www_authenticate)
if scope_match:
params["scope"] = scope_match.group(1)
elif repository:
params["scope"] = f"repository:{repository}:pull"
auth = None
if self.username and self.password:
auth = (self.username, self.password)
resp = self._request_with_retry("GET", realm, params=params, auth=auth)
if resp.status_code != 200:
raise ImageRegistryAuthError(
file=__file__,
message=f"Failed to obtain bearer token from {realm} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
data = resp.json()
token = data.get("token") or data.get("access_token", "")
if not token:
raise ImageRegistryAuthError(
file=__file__,
message=f"Token endpoint {realm} returned an empty token. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
return token
@staticmethod
def _validate_realm_url(realm: str) -> None:
parsed = urlparse(realm)
if parsed.scheme not in ("http", "https"):
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm has disallowed scheme: {parsed.scheme}. Only http/https are allowed.",
)
if parsed.scheme == "http":
logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
hostname = parsed.hostname or ""
try:
addr = ipaddress.ip_address(hostname)
if addr.is_private or addr.is_loopback or addr.is_link_local:
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm points to a private/loopback address: {hostname}. This may indicate an SSRF attempt.",
)
except ValueError:
pass
def _resolve_basic_credentials(self) -> tuple[str | None, str | None]:
"""Decode pre-encoded base64 auth tokens (e.g., from aws ecr get-authorization-token).
Returns (username, password) — decoded if the password is a base64 token
containing 'username:real_password', otherwise returned as-is.
"""
if not self.password:
return self.username, self.password
try:
decoded = base64.b64decode(self.password).decode("utf-8")
if decoded.startswith(f"{self.username}:"):
return self.username, decoded[len(self.username) + 1 :]
except (ValueError, UnicodeDecodeError):
logger.debug("Password is not a base64-encoded auth token, using as-is")
return self.username, self.password
def _authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
resp = self._do_authed_request(method, url, **kwargs)
if resp.status_code == 401 and self._bearer_token:
logger.debug(
f"Bearer token rejected (HTTP 401), re-authenticating to {self.registry_url}"
)
self._bearer_token = None
self._ensure_auth()
resp = self._do_authed_request(method, url, **kwargs)
return resp
def _do_authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
if self._bearer_token:
headers["Authorization"] = f"Bearer {self._bearer_token}"
elif self.username and self.password:
user, pwd = self._resolve_basic_credentials()
kwargs.setdefault("auth", (user, pwd))
kwargs["headers"] = headers
return self._request_with_retry(method, url, **kwargs)
def _check_response(self, resp: requests.Response, context: str) -> None:
if resp.status_code == 200:
return
if resp.status_code in (401, 403):
raise ImageRegistryAuthError(
file=__file__,
message=f"Authentication failed for {context} on {self.registry_url} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Unexpected error during {context} on {self.registry_url} (HTTP {resp.status_code}): {resp.text[:200]}",
)

View File

@@ -1,6 +0,0 @@
from prowler.providers.common.provider import Provider
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
DefenderIdentity,
)
defenderidentity_client = DefenderIdentity(Provider.get_global_provider())

View File

@@ -1,37 +0,0 @@
{
"Provider": "m365",
"CheckID": "defenderidentity_health_issues_no_open",
"CheckTitle": "Defender for Identity has no unresolved health issues affecting hybrid infrastructure monitoring",
"CheckType": [],
"ServiceName": "defenderidentity",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Defender for Identity Health Issue",
"ResourceGroup": "security",
"Description": "Microsoft Defender for Identity (MDI) monitors your hybrid identity infrastructure and detects advanced threats targeting Active Directory. This check verifies that MDI sensors are deployed and that there are no unresolved health issues that may affect the ability to detect identity-based attacks.",
"Risk": "Without deployed MDI sensors or with unresolved health issues, organizations face critical gaps in threat detection. Misconfigured or missing sensors fail to monitor domain controllers, allowing identity-based attacks like Pass-the-Hash, Golden Ticket, or lateral movement to go undetected. Attackers commonly exploit these blind spots to compromise hybrid environments while evading detection.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/defender-for-identity/health-alerts",
"https://learn.microsoft.com/en-us/graph/api/security-identitycontainer-list-healthissues"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Defender XDR portal at https://security.microsoft.com/\n2. Go to Settings > Identities > Health issues\n3. Review each open health issue and its recommendations\n4. Follow the specific remediation steps provided for each issue\n5. Verify the issue is resolved and status changes to closed",
"Terraform": ""
},
"Recommendation": {
"Text": "Regularly monitor and resolve Defender for Identity health issues to maintain comprehensive visibility into identity-based threats across your hybrid infrastructure.",
"Url": "https://hub.prowler.com/check/defenderidentity_health_issues_no_open"
}
},
"Categories": [
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires SecurityIdentitiesHealth.Read.All permission and a hybrid identity environment with Active Directory on-premises connected to Microsoft Defender for Identity. Health issues can be global (domain-related, such as Directory Services account issues or auditing misconfigurations) or sensor-specific. If no hybrid AD environment is configured, this check will pass with no health issues detected, as MDI only monitors on-premises Active Directory infrastructure."
}

View File

@@ -1,140 +0,0 @@
"""Check for open health issues in Microsoft Defender for Identity.
This module provides a security check that verifies there are no unresolved
health issues in the Microsoft Defender for Identity deployment.
"""
from typing import List
from prowler.lib.check.models import Check, CheckReportM365, Severity
from prowler.providers.m365.services.defenderidentity.defenderidentity_client import (
defenderidentity_client,
)
class defenderidentity_health_issues_no_open(Check):
"""Ensure Microsoft Defender for Identity has no unresolved health issues.
This check evaluates whether there are open health issues in the MDI deployment
that require attention to maintain proper hybrid identity protection.
- PASS: The health issue has been resolved (status is not open).
- FAIL: The health issue is open and requires attention.
- FAIL: No sensors are deployed (MDI cannot protect the environment).
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the check for open MDI health issues.
This method iterates through all health issues from Microsoft Defender
for Identity and reports on their status. Open issues indicate potential
configuration problems or sensor health concerns that need resolution.
Returns:
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
# Check sensors first - None means API error, empty list means no sensors
sensors_api_failed = defenderidentity_client.sensors is None
health_issues_api_failed = defenderidentity_client.health_issues is None
has_sensors = (
defenderidentity_client.sensors and len(defenderidentity_client.sensors) > 0
)
# If both APIs failed, it's likely a permission issue
if sensors_api_failed and health_issues_api_failed:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender for Identity",
resource_id="defenderIdentity",
)
report.status = "FAIL"
report.status_extended = (
"Defender for Identity APIs are not accessible. "
"Ensure the Service Principal has SecurityIdentitiesSensors.Read.All and "
"SecurityIdentitiesHealth.Read.All permissions granted."
)
findings.append(report)
return findings
# If only health issues API failed but we have sensors
if health_issues_api_failed and has_sensors:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender for Identity",
resource_id="defenderIdentity",
)
report.status = "FAIL"
report.status_extended = (
f"Cannot read health issues from Defender for Identity "
f"(found {len(defenderidentity_client.sensors)} sensor(s) deployed). "
"Ensure the Service Principal has SecurityIdentitiesHealth.Read.All permission."
)
findings.append(report)
return findings
# If no sensors are deployed (empty list, not None), MDI cannot monitor
if not has_sensors and not sensors_api_failed:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender for Identity",
resource_id="defenderIdentity",
)
report.status = "FAIL"
report.status_extended = (
"No sensors deployed in Defender for Identity. "
"Without sensors, MDI cannot monitor health issues in the environment. "
"Deploy sensors on domain controllers to enable protection."
)
findings.append(report)
return findings
# If health_issues is empty list - no issues exist, this is compliant
if not defenderidentity_client.health_issues:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender for Identity",
resource_id="defenderIdentity",
)
report.status = "PASS"
report.status_extended = (
"No open health issues found in Defender for Identity."
)
findings.append(report)
return findings
for health_issue in defenderidentity_client.health_issues:
report = CheckReportM365(
metadata=self.metadata(),
resource=health_issue,
resource_name=health_issue.display_name,
resource_id=health_issue.id,
)
issue_type = health_issue.health_issue_type or "unknown"
severity = health_issue.severity or "unknown"
status = (health_issue.status or "").lower()
if status != "open":
report.status = "PASS"
report.status_extended = f"Defender for Identity {issue_type} health issue {health_issue.display_name} is resolved."
else:
report.status = "FAIL"
report.status_extended = f"Defender for Identity {issue_type} health issue {health_issue.display_name} is open with {severity} severity."
# Adjust severity based on issue severity
if severity == "high":
report.check_metadata.Severity = Severity.high
elif severity == "medium":
report.check_metadata.Severity = Severity.medium
elif severity == "low":
report.check_metadata.Severity = Severity.low
findings.append(report)
return findings

View File

@@ -1,292 +0,0 @@
"""Microsoft Defender for Identity service module.
This module provides the DefenderIdentity service class for interacting with
Microsoft Defender for Identity (MDI) APIs, including health issues and sensors.
"""
import asyncio
from typing import List, Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
class DefenderIdentity(M365Service):
"""Microsoft Defender for Identity service class.
This class provides methods to retrieve and manage Microsoft Defender for Identity
health issues, which monitor the health status of MDI configuration and sensors.
Attributes:
health_issues (list[HealthIssue]): List of health issues from MDI.
sensors (list[Sensor]): List of sensors from MDI.
"""
def __init__(self, provider: M365Provider):
"""Initialize the DefenderIdentity service client.
Args:
provider: The M365Provider instance for authentication and configuration.
"""
super().__init__(provider)
self.sensors: Optional[List[Sensor]] = []
self.health_issues: Optional[List[HealthIssue]] = []
created_loop = False
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
created_loop = True
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
created_loop = True
if loop.is_running():
raise RuntimeError(
"Cannot initialize DefenderIdentity service while event loop is running"
)
self.sensors = loop.run_until_complete(self._get_sensors())
self.health_issues = loop.run_until_complete(self._get_health_issues())
if created_loop:
asyncio.set_event_loop(None)
loop.close()
async def _get_sensors(self) -> Optional[List["Sensor"]]:
"""Retrieve sensors from Microsoft Defender for Identity.
This method fetches all MDI sensors deployed in the environment,
including their health status and configuration.
Returns:
Optional[List[Sensor]]: A list of sensors from MDI,
or None if the API call failed (tenant not onboarded or missing permissions).
"""
logger.info("DefenderIdentity - Getting sensors...")
sensors: Optional[List[Sensor]] = []
# Step 1: Call the API
try:
sensors_response = await self.client.security.identities.sensors.get()
except Exception as error:
error_msg = str(error)
if "403" in error_msg or "Forbidden" in error_msg:
logger.error(
"DefenderIdentity - Permission denied accessing sensors API. "
"Ensure the Service Principal has SecurityIdentitiesSensors.Read.All permission."
)
elif "401" in error_msg or "Unauthorized" in error_msg:
logger.error(
"DefenderIdentity - Authentication failed accessing sensors API. "
"Verify the Service Principal credentials are valid."
)
else:
logger.error(
f"DefenderIdentity - API error getting sensors: "
f"{error.__class__.__name__}: {error}"
)
return None
# Step 2: Parse the response
try:
while sensors_response:
for sensor in getattr(sensors_response, "value", []) or []:
sensors.append(
Sensor(
id=getattr(sensor, "id", ""),
display_name=getattr(sensor, "display_name", ""),
sensor_type=(
str(getattr(sensor, "sensor_type", ""))
if getattr(sensor, "sensor_type", None)
else None
),
deployment_status=(
str(getattr(sensor, "deployment_status", ""))
if getattr(sensor, "deployment_status", None)
else None
),
health_status=(
str(getattr(sensor, "health_status", ""))
if getattr(sensor, "health_status", None)
else None
),
open_health_issues_count=getattr(
sensor, "open_health_issues_count", 0
)
or 0,
domain_name=getattr(sensor, "domain_name", ""),
version=getattr(sensor, "version", ""),
created_date_time=str(
getattr(sensor, "created_date_time", "")
),
)
)
next_link = getattr(sensors_response, "odata_next_link", None)
if not next_link:
break
sensors_response = (
await self.client.security.identities.sensors.with_url(
next_link
).get()
)
except Exception as error:
logger.error(
f"DefenderIdentity - Error parsing sensors response: "
f"{error.__class__.__name__}: {error}"
)
return None
return sensors
async def _get_health_issues(self) -> Optional[List["HealthIssue"]]:
"""Retrieve health issues from Microsoft Defender for Identity.
This method fetches all health issues from the MDI deployment including
both global and sensor-specific health alerts.
Returns:
Optional[List[HealthIssue]]: A list of health issues from MDI,
or None if the API call failed (tenant not onboarded or missing permissions).
"""
logger.info("DefenderIdentity - Getting health issues...")
health_issues: Optional[List[HealthIssue]] = []
# Step 1: Call the API
try:
health_issues_response = (
await self.client.security.identities.health_issues.get()
)
except Exception as error:
error_msg = str(error)
if "403" in error_msg or "Forbidden" in error_msg:
logger.error(
"DefenderIdentity - Permission denied accessing health issues API. "
"Ensure the Service Principal has SecurityIdentitiesHealth.Read.All permission."
)
elif "401" in error_msg or "Unauthorized" in error_msg:
logger.error(
"DefenderIdentity - Authentication failed accessing health issues API. "
"Verify the Service Principal credentials are valid."
)
else:
logger.error(
f"DefenderIdentity - API error getting health issues: "
f"{error.__class__.__name__}: {error}"
)
return None
# Step 2: Parse the response
try:
while health_issues_response:
for issue in getattr(health_issues_response, "value", []) or []:
health_issues.append(
HealthIssue(
id=getattr(issue, "id", ""),
display_name=getattr(issue, "display_name", ""),
description=getattr(issue, "description", ""),
health_issue_type=getattr(issue, "health_issue_type", None),
severity=getattr(issue, "severity", None),
status=getattr(issue, "status", None),
created_date_time=str(
getattr(issue, "created_date_time", "")
),
last_modified_date_time=str(
getattr(issue, "last_modified_date_time", "")
),
domain_names=getattr(issue, "domain_names", []) or [],
sensor_dns_names=getattr(issue, "sensor_d_n_s_names", [])
or [],
issue_type_id=getattr(issue, "issue_type_id", None),
recommendations=getattr(issue, "recommendations", []) or [],
additional_information=getattr(
issue, "additional_information", []
)
or [],
)
)
next_link = getattr(health_issues_response, "odata_next_link", None)
if not next_link:
break
health_issues_response = (
await self.client.security.identities.health_issues.with_url(
next_link
).get()
)
except Exception as error:
logger.error(
f"DefenderIdentity - Error parsing health issues response: "
f"{error.__class__.__name__}: {error}"
)
return None
return health_issues
class Sensor(BaseModel):
"""Model for Microsoft Defender for Identity sensor.
Attributes:
id: The unique identifier for the sensor.
display_name: The display name of the sensor.
sensor_type: The type of sensor (domainControllerIntegrated, domainControllerStandalone, adfsIntegrated).
deployment_status: The deployment status (upToDate, outdated, updating, updateFailed, notConfigured).
health_status: The health status of the sensor (healthy, notHealthyLow, notHealthyMedium, notHealthyHigh).
open_health_issues_count: Number of open health issues for this sensor.
domain_name: The domain name the sensor is monitoring.
version: The version of the sensor.
created_date_time: When the sensor was created.
"""
id: str
display_name: str
sensor_type: Optional[str]
deployment_status: Optional[str]
health_status: Optional[str]
open_health_issues_count: int
domain_name: str
version: str
created_date_time: str
class HealthIssue(BaseModel):
"""Model for Microsoft Defender for Identity health issue.
Attributes:
id: The unique identifier for the health issue.
display_name: The display name of the health issue.
description: A detailed description of the health issue.
health_issue_type: The type of health issue (global or sensor).
severity: The severity level of the issue (low, medium, high).
status: The current status of the issue (open, closed).
created_date_time: When the issue was created.
last_modified_date_time: When the issue was last modified.
domain_names: List of domain names affected by the issue.
sensor_dns_names: List of sensor DNS names affected by the issue.
issue_type_id: The type identifier for the issue.
recommendations: List of recommended actions to resolve the issue.
additional_information: Additional information about the issue.
"""
id: str
display_name: str
description: str
health_issue_type: Optional[str]
severity: Optional[str]
status: Optional[str]
created_date_time: str
last_modified_date_time: str
domain_names: List[str]
sensor_dns_names: List[str]
issue_type_id: Optional[str]
recommendations: List[str]
additional_information: List[str]

View File

@@ -1,4 +0,0 @@
from prowler.providers.common.provider import Provider
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import DefenderXDR
defenderxdr_client = DefenderXDR(Provider.get_global_provider())

View File

@@ -1,37 +0,0 @@
{
"Provider": "m365",
"CheckID": "defenderxdr_critical_asset_management_pending_approvals",
"CheckTitle": "Ensure all Critical Asset Management classifications are reviewed and approved in Microsoft Defender XDR",
"CheckType": [],
"ServiceName": "defenderxdr",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Defender XDR Critical Asset Management",
"ResourceGroup": "security",
"Description": "Assets with a lower classification confidence score in Microsoft Defender XDR must be approved by a security administrator.\n\nAsset classifications that have not yet been reviewed and approved may result in incomplete **critical asset** visibility.",
"Risk": "Stale pending approvals lead to limited visibility in Microsoft Defender XDR. **Critical assets** that are not properly identified and classified may not receive appropriate security monitoring and protections, creating gaps in the organization's security posture.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/security-exposure-management/classify-critical-assets",
"https://learn.microsoft.com/en-us/security-exposure-management/classify-critical-assets#review-critical-assets"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to **Microsoft Defender** at https://security.microsoft.com/\n2. Go to **Settings** > **Microsoft Defender XDR** > **Critical asset management**\n3. Review each pending approval listed in the check results\n4. Verify the correct classification for each asset\n5. Approve or reject the classification as appropriate",
"Terraform": ""
},
"Recommendation": {
"Text": "Regularly review and approve pending critical asset classifications to ensure accurate asset visibility in Microsoft Defender XDR. Stale approvals reduce the effectiveness of security monitoring and incident response for critical assets.",
"Url": "https://hub.prowler.com/check/defenderxdr_critical_asset_management_pending_approvals"
}
},
"Categories": [
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender XDR with Security Exposure Management enabled. The ThreatHunting.Read.All permission is required to query the ExposureGraphNodes table via the Advanced Hunting API. Approved assets will be reflected in the classification table within 24 hours."
}

View File

@@ -1,86 +0,0 @@
"""Check for pending Critical Asset Management approvals in Defender XDR.
This check identifies asset classifications with low confidence scores
that require security administrator review and approval.
"""
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defenderxdr.defenderxdr_client import (
defenderxdr_client,
)
class defenderxdr_critical_asset_management_pending_approvals(Check):
"""Check for pending Critical Asset Management approvals in Microsoft Defender XDR.
This check queries Advanced Hunting to identify assets with low classification
confidence scores that have not been reviewed by a security administrator.
Prerequisites:
1. ThreatHunting.Read.All permission granted
2. Microsoft Defender XDR with Security Exposure Management enabled
Results:
- PASS: No pending approvals for Critical Asset Management are found.
- FAIL: At least one asset classification has pending approvals.
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the check for pending Critical Asset Management approvals.
Evaluates whether there are any pending Critical Asset Management
approvals that require administrator review.
Returns:
A list of reports containing the result of the check.
"""
findings = []
pending_approvals = defenderxdr_client.pending_cam_approvals
# API call failed - likely missing ThreatHunting.Read.All permission
if pending_approvals is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Critical Asset Management",
resource_id="criticalAssetManagement",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Critical Asset Management status. "
"Verify that ThreatHunting.Read.All permission is granted."
)
findings.append(report)
return findings
if not pending_approvals:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Critical Asset Management",
resource_id="criticalAssetManagement",
)
report.status = "PASS"
report.status_extended = "No pending approvals for Critical Asset Management classifications are found."
findings.append(report)
else:
for approval in pending_approvals:
report = CheckReportM365(
metadata=self.metadata(),
resource=approval,
resource_name=f"CAM Classification: {approval.classification}",
resource_id=f"cam/{approval.classification}",
)
report.status = "FAIL"
assets_summary = ", ".join(approval.assets[:5])
if len(approval.assets) > 5:
assets_summary += f" and {len(approval.assets) - 5} more"
report.status_extended = (
f"Critical Asset Management classification '{approval.classification}' "
f"has {approval.pending_count} asset(s) pending approval: {assets_summary}."
)
findings.append(report)
return findings

View File

@@ -1,39 +0,0 @@
{
"Provider": "m365",
"CheckID": "defenderxdr_endpoint_privileged_user_exposed_credentials",
"CheckTitle": "Privileged users do not have credentials exposed on vulnerable endpoints",
"CheckType": [],
"ServiceName": "defenderxdr",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "critical",
"ResourceType": "Exposure Management",
"ResourceGroup": "security",
"Description": "Privileged users may have authentication artifacts (CLI secrets, cookies, tokens) exposed on endpoints with high risk scores. Microsoft Defender XDR's Security Exposure Management detects when credentials from users with Entra ID privileged roles are present on vulnerable devices.",
"Risk": "Exposed credentials on vulnerable endpoints enable account takeover through stolen tokens or cookies, Conditional Access bypass via primary refresh tokens, lateral movement to sensitive resources, and persistence until tokens are explicitly revoked.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/security-exposure-management/prerequisites",
"https://learn.microsoft.com/en-us/defender-xdr/advanced-hunting-exposuregraphedges-table"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Defender portal at https://security.microsoft.com\n2. Go to Exposure Management > Attack surface > Attack paths\n3. Review the exposed credential findings for privileged users\n4. For each affected device, review the risk and exposure score in Device Inventory\n5. Remediate endpoint vulnerabilities and improve device security posture\n6. Revoke affected user sessions and rotate credentials\n7. Consider implementing Privileged Access Workstations (PAWs) for privileged users",
"Terraform": ""
},
"Recommendation": {
"Text": "Privileged users should only authenticate from secure, hardened devices with low exposure scores. Implement Privileged Access Workstations (PAWs) and enforce device compliance policies through Conditional Access to prevent credential exposure on vulnerable endpoints.",
"Url": "https://hub.prowler.com/check/defenderxdr_endpoint_privileged_user_exposed_credentials"
}
},
"Categories": [
"secrets",
"identity-access",
"e5"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender XDR with Security Exposure Management enabled. The ThreatHunting.Read.All permission is required to query the ExposureGraphEdges table via the Advanced Hunting API."
}

View File

@@ -1,148 +0,0 @@
"""Check for exposed credentials of privileged users in Defender XDR.
This check identifies privileged users whose authentication credentials
(CLI secrets, cookies, tokens) are exposed on vulnerable endpoints.
"""
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defenderxdr.defenderxdr_client import (
defenderxdr_client,
)
class defenderxdr_endpoint_privileged_user_exposed_credentials(Check):
"""Check if privileged users have exposed credentials on endpoints.
This check queries Microsoft Defender XDR's ExposureGraphEdges
table via the Advanced Hunting API to identify privileged users whose
authentication artifacts (CLI secrets, user cookies, sensitive tokens)
are exposed on endpoints with high risk or exposure scores.
Prerequisites:
1. ThreatHunting.Read.All permission granted
2. Microsoft Defender for Endpoint (MDE) enabled and deployed on devices
Results:
- PASS: No exposed credentials found OR MDE enabled but no devices
- FAIL: Exposed credentials detected OR MDE not enabled (blind spot)
"""
def execute(self) -> list[CheckReportM365]:
"""Execute the check for exposed credentials of privileged users.
Returns:
List[CheckReportM365]: A list of reports with check results.
"""
findings = []
# Step 1: Check MDE status
mde_status = defenderxdr_client.mde_status
# API call failed - likely missing ThreatHunting.Read.All permission
if mde_status is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeStatus",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Microsoft Defender XDR status. "
"Verify that ThreatHunting.Read.All permission is granted."
)
findings.append(report)
return findings
# MDE not enabled - this is a security blind spot
if mde_status == "not_enabled":
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeStatus",
)
report.status = "FAIL"
report.status_extended = (
"Microsoft Defender for Endpoint is not enabled. "
"Without MDE there is no visibility into credential "
"exposure on endpoints."
)
findings.append(report)
return findings
# MDE enabled but no devices - PASS (no endpoints to evaluate)
if mde_status == "no_devices":
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="mdeDevices",
)
report.status = "PASS"
report.status_extended = (
"Microsoft Defender for Endpoint is enabled but no devices "
"are onboarded. No endpoints to evaluate for credential "
"exposure."
)
findings.append(report)
return findings
# Step 2: MDE is active with devices - check for exposed credentials
exposed_credentials = defenderxdr_client.exposed_credentials_privileged_users
# API call failed for exposed credentials query
if exposed_credentials is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR",
resource_id="exposedCredentials",
)
report.status = "FAIL"
report.status_extended = (
"Unable to query Security Exposure Management for exposed "
"credentials. Verify that Security Exposure Management "
"is enabled."
)
findings.append(report)
return findings
# Found exposed credentials - report each one
if exposed_credentials:
for exposed_user in exposed_credentials:
report = CheckReportM365(
metadata=self.metadata(),
resource=exposed_user,
resource_name=exposed_user.target_node_name,
resource_id=(exposed_user.target_node_id or exposed_user.edge_id),
)
report.status = "FAIL"
credential_info = (
f" ({exposed_user.credential_type})"
if exposed_user.credential_type
else ""
)
report.status_extended = (
f"Privileged user {exposed_user.target_node_name} has "
f"exposed credentials{credential_info} on device "
f"{exposed_user.source_node_name}."
)
findings.append(report)
else:
# No exposed credentials found - full visibility, no risk detected
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender XDR Exposure Management",
resource_id="exposedCredentials",
)
report.status = "PASS"
report.status_extended = (
"No exposed credentials found for privileged users on "
"vulnerable endpoints."
)
findings.append(report)
return findings

View File

@@ -1,322 +0,0 @@
"""Microsoft Defender XDR service module.
This module provides access to Microsoft Defender XDR data
through the Microsoft Graph Security Advanced Hunting API.
"""
import asyncio
import json
from typing import Dict, List, Optional
from msgraph.generated.security.microsoft_graph_security_run_hunting_query.run_hunting_query_post_request_body import (
RunHuntingQueryPostRequestBody,
)
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
class DefenderXDR(M365Service):
"""Microsoft Defender XDR service class.
Provides access to Microsoft Defender XDR data through
the Microsoft Graph Security Advanced Hunting API.
This class handles endpoint security checks including:
- Device security posture
- Exposed credentials detection
- Vulnerability assessments
- Critical Asset Management approvals
Attributes:
mde_status: Status of MDE deployment
(None, "not_enabled", "no_devices", "active")
exposed_credentials_privileged_users: List of privileged users
with exposed credentials
pending_cam_approvals: List of pending Critical Asset Management
approvals (None if API error)
"""
def __init__(self, provider: M365Provider):
"""Initialize the DefenderXDR service client.
Args:
provider: The M365Provider instance for authentication.
"""
super().__init__(provider)
# MDE status: None = API error, "not_enabled" = table not found,
# "no_devices" = enabled but empty, "active" = has devices
self.mde_status: Optional[str] = None
# Check data
self.exposed_credentials_privileged_users: Optional[
List[ExposedCredentialPrivilegedUser]
] = []
self.pending_cam_approvals: Optional[List[PendingCAMApproval]] = []
loop = self._get_event_loop()
try:
(
self.mde_status,
self.exposed_credentials_privileged_users,
self.pending_cam_approvals,
) = loop.run_until_complete(
asyncio.gather(
self._check_mde_status(),
self._get_exposed_credentials_privileged_users(),
self._get_pending_cam_approvals(),
)
)
finally:
self._cleanup_event_loop(loop)
def _get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Get or create an event loop for async operations."""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
raise RuntimeError(
"Cannot initialize DefenderXDR service while event loop is running"
)
return loop
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def _cleanup_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""Clean up the event loop if we created it."""
try:
if loop and not loop.is_running():
asyncio.set_event_loop(None)
loop.close()
except Exception as error:
# Best-effort cleanup: swallow errors but log them for diagnostics
logger.debug(f"DefenderXDR - Failed to clean up event loop: {error}")
async def _run_hunting_query(self, query: str) -> tuple[Optional[List[Dict]], bool]:
"""Execute an Advanced Hunting query using Microsoft Graph Security API.
Args:
query: The KQL (Kusto Query Language) query to execute.
Returns:
Tuple of (results, table_not_found):
- results: List of result dicts, empty list if no results,
None if API error.
- table_not_found: True if query failed because table
doesn't exist.
"""
try:
request_body = RunHuntingQueryPostRequestBody(query=query)
response = await self.client.security.microsoft_graph_security_run_hunting_query.post(
request_body
)
if not response or not response.results:
return [], False
results = [
row.additional_data
for row in response.results
if hasattr(row, "additional_data")
]
return results, False
except Exception as error:
error_message = str(error).lower()
if (
"failed to resolve table" in error_message
or "could not find table" in error_message
):
logger.warning(f"DefenderXDR - Table not found in query: {error}")
return [], True
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None, False
async def _check_mde_status(self) -> Optional[str]:
"""Check Microsoft Defender for Endpoint status.
Returns:
- None: API call failed (permission issue)
- "not_enabled": DeviceInfo table doesn't exist (MDE not enabled)
- "no_devices": MDE enabled but no devices onboarded
- "active": MDE enabled with devices reporting
"""
logger.info("DefenderXDR - Checking MDE status...")
query = "DeviceInfo | summarize DeviceCount = count()"
results, table_not_found = await self._run_hunting_query(query)
if results is None:
return None
if table_not_found:
return "not_enabled"
if results and len(results) > 0:
device_count = results[0].get("DeviceCount", 0)
if device_count > 0:
return "active"
return "no_devices"
async def _get_exposed_credentials_privileged_users(
self,
) -> Optional[List["ExposedCredentialPrivilegedUser"]]:
"""Query for privileged users with exposed credentials.
Returns:
List of ExposedCredentialPrivilegedUser objects,
or None if API call failed.
"""
logger.info(
"DefenderXDR - Querying for exposed credentials of privileged users..."
)
query = """
ExposureGraphEdges
| where EdgeLabel == "hasCredentialsFor"
| where TargetNodeLabel == "user"
| extend targetCategories = parse_json(TargetNodeCategories)
| where targetCategories has "PrivilegedEntraIdRole" or targetCategories has "privileged"
| extend credentialType = tostring(parse_json(EdgeProperties).credentialType)
| project
EdgeId,
SourceNodeId,
SourceNodeName,
SourceNodeLabel,
TargetNodeId,
TargetNodeName,
TargetNodeLabel,
CredentialType = credentialType,
TargetCategories = TargetNodeCategories
"""
results, _ = await self._run_hunting_query(query)
if results is None:
return None
return [self._parse_exposed_credential(row) for row in results if row]
def _parse_exposed_credential(self, row: Dict) -> "ExposedCredentialPrivilegedUser":
"""Parse a single row into an ExposedCredentialPrivilegedUser."""
target_categories = row.get("TargetCategories", [])
if isinstance(target_categories, str):
try:
target_categories = json.loads(target_categories)
except (json.JSONDecodeError, ValueError):
target_categories = []
return ExposedCredentialPrivilegedUser(
edge_id=str(row.get("EdgeId", "")),
source_node_id=str(row.get("SourceNodeId", "")),
source_node_name=str(row.get("SourceNodeName", "Unknown")),
source_node_label=str(row.get("SourceNodeLabel", "")),
target_node_id=str(row.get("TargetNodeId", "")),
target_node_name=str(row.get("TargetNodeName", "Unknown")),
target_node_label=str(row.get("TargetNodeLabel", "")),
credential_type=str(row.get("CredentialType") or "Unknown"),
target_categories=target_categories,
)
async def _get_pending_cam_approvals(
self,
) -> Optional[List["PendingCAMApproval"]]:
"""Query for pending Critical Asset Management approvals.
Queries the ExposureGraphNodes table to find assets with low criticality
confidence scores that require administrator approval.
Returns:
List of PendingCAMApproval objects, or None if API call failed.
"""
logger.info(
"DefenderXDR - Querying for pending Critical Asset Management approvals..."
)
query = """
ExposureGraphNodes
| where isnotempty(parse_json(NodeProperties)['rawData']['criticalityConfidenceLow'])
| mv-expand parse_json(NodeProperties)['rawData']['criticalityConfidenceLow']
| extend Classification = tostring(NodeProperties_rawData_criticalityConfidenceLow)
| summarize PendingApproval = count(), Assets = array_sort_asc(make_set(NodeName)) by Classification
| sort by Classification asc
"""
results, _ = await self._run_hunting_query(query)
if results is None:
return None
pending_approvals = []
for row in results:
if not row:
continue
classification = row.get("Classification", "")
pending_count = int(row.get("PendingApproval", 0))
assets_raw = row.get("Assets", "[]")
if isinstance(assets_raw, str):
try:
assets = json.loads(assets_raw)
except (json.JSONDecodeError, ValueError):
assets = []
elif isinstance(assets_raw, list):
assets = assets_raw
else:
assets = []
pending_approvals.append(
PendingCAMApproval(
classification=classification,
pending_count=pending_count,
assets=assets,
)
)
return pending_approvals
class ExposedCredentialPrivilegedUser(BaseModel):
"""Model for exposed credential data of a privileged user.
Represents authentication credentials (CLI secrets, user cookies, tokens)
of privileged users that are exposed on vulnerable endpoints.
"""
edge_id: str
source_node_id: str
source_node_name: str
source_node_label: str
target_node_id: str
target_node_name: str
target_node_label: str
credential_type: Optional[str] = None
target_categories: list = []
class PendingCAMApproval(BaseModel):
"""Model for a pending Critical Asset Management approval classification.
Represents assets with low criticality confidence scores that require
security administrator review and approval.
Attributes:
classification: The asset classification name pending approval.
pending_count: The number of assets pending approval for this classification.
assets: List of asset names pending approval.
"""
classification: str
pending_count: int
assets: List[str]

View File

@@ -1,37 +0,0 @@
{
"Provider": "m365",
"CheckID": "entra_app_registration_no_unused_privileged_permissions",
"CheckTitle": "App registration has no unused privileged API permissions",
"CheckType": [],
"ServiceName": "entra",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "App Registration",
"ResourceGroup": "IAM",
"Description": "OAuth app registrations with privileged API permissions (High privilege level) that are not being actively used. Usage status is determined by Microsoft Defender for Cloud Apps App Governance.",
"Risk": "Unused privileged permissions expand the attack surface. If a compromised app has dormant privileged permissions, attackers can exploit them for **privilege escalation**, **unauthorized access** to sensitive data, or **lateral movement** within the environment.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/defender-cloud-apps/app-governance-visibility-insights-overview",
"https://learn.microsoft.com/en-us/defender-xdr/advanced-hunting-oauthappinfo-table"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Defender XDR portal (https://security.microsoft.com)\n2. Go to Cloud apps > App governance > Overview\n3. Review the Applications inventory for apps with unused permissions\n4. For each flagged app, view details and navigate to the Permissions tab\n5. Remove unnecessary permissions via Microsoft Entra admin center",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply the **principle of least privilege** by regularly reviewing and revoking unused privileged permissions from app registrations. Use Microsoft Defender for Cloud Apps App Governance to monitor permission usage.",
"Url": "https://hub.prowler.com/check/entra_app_registration_no_unused_privileged_permissions"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check requires Microsoft Defender for Cloud Apps with App Governance enabled and ThreatHunting.Read.All permission. If App Governance data is unavailable, the check fails due to missing visibility."
}

View File

@@ -1,145 +0,0 @@
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.entra.entra_client import entra_client
class entra_app_registration_no_unused_privileged_permissions(Check):
"""
Ensure that app registrations do not have unused privileged API permissions.
This check evaluates OAuth applications registered in Microsoft Entra ID to identify
those with privileged API permissions (High privilege level or Control/Management Plane
classifications) that are assigned but not actively being used.
The check uses data from Microsoft Defender for Cloud Apps App Governance via
the OAuthAppInfo table in Defender XDR Advanced Hunting.
- PASS: The app has no unused privileged permissions.
- FAIL: The app has one or more unused privileged permissions that should be revoked.
It also fails when OAuth App Governance data is not available.
"""
# InUse field values from OAuthAppInfo:
# - "true" / "1" / "True" = permission is actively used
# - "false" / "0" / "False" = permission is NOT used (this triggers FAIL)
# - "Not supported" = Microsoft cannot determine usage
# - "" (empty) = No tracking data available
# Note: Microsoft is changing from numeric (1/0) to textual (True/False) on Feb 25, 2026
_UNUSED_STATUSES = {"false", "0", "notinuse", "not in use"}
_PRIVILEGED_PLANE_LABELS = ("control plane", "management plane")
def execute(self) -> list[CheckReportM365]:
"""
Execute the unused privileged permissions check for app registrations.
Iterates over OAuth applications retrieved from the Entra client and generates
reports indicating whether each app has unused privileged permissions.
Returns:
list[CheckReportM365]: A list of reports with the result of the check for each app.
"""
findings = []
# If OAuth app data is None, the API call failed (missing permissions or App Governance not enabled)
if entra_client.oauth_apps is None:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="OAuth Applications",
resource_id="oauthApps",
)
report.status = "FAIL"
report.status_extended = (
"OAuth App Governance data is unavailable. "
"Enable App Governance in Microsoft Defender for Cloud Apps and "
"grant ThreatHunting.Read.All to evaluate unused privileged permissions."
)
findings.append(report)
return findings
# If OAuth apps is empty dict, no apps are registered - this is compliant
if not entra_client.oauth_apps:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="OAuth Applications",
resource_id="oauthApps",
)
report.status = "PASS"
report.status_extended = (
"No OAuth applications are registered in the tenant."
)
findings.append(report)
return findings
# Check each OAuth app for unused privileged permissions
for app_id, app in entra_client.oauth_apps.items():
report = CheckReportM365(
metadata=self.metadata(),
resource=app,
resource_name=app.name,
resource_id=app_id,
)
# Find unused privileged permissions
# A permission is considered privileged if it has:
# - PrivilegeLevel == "High"
# Or if it's part of Control Plane / Management Plane (typically High privilege)
unused_privileged_permissions = []
for permission in app.permissions:
# Check if the permission is privileged
is_privileged = self._is_privileged_permission(permission)
# Check if the permission is unused
normalized_usage = self._normalize(permission.usage_status)
is_unused = normalized_usage in self._UNUSED_STATUSES
if is_privileged and is_unused:
unused_privileged_permissions.append(permission.name)
if unused_privileged_permissions:
# The app has unused privileged permissions
report.status = "FAIL"
# Truncate list to first 5 permissions for readability
total_count = len(unused_privileged_permissions)
if total_count > 5:
displayed = unused_privileged_permissions[:5]
permissions_list = ", ".join(displayed)
remaining = total_count - 5
permissions_list += f" (and {remaining} more)"
else:
permissions_list = ", ".join(unused_privileged_permissions)
report.status_extended = (
f"App registration {app.name} has {total_count} "
f"unused privileged permission(s): {permissions_list}."
)
else:
# The app has no unused privileged permissions
report.status = "PASS"
report.status_extended = (
f"App registration {app.name} has no unused privileged permissions."
)
findings.append(report)
return findings
@classmethod
def _is_privileged_permission(cls, permission) -> bool:
privilege_level = cls._normalize(permission.privilege_level)
permission_type = cls._normalize(permission.permission_type)
classification = cls._normalize(getattr(permission, "classification", ""))
if privilege_level == "high":
return True
return any(
label in permission_type or label in classification
for label in cls._PRIVILEGED_PLANE_LABELS
)
@staticmethod
def _normalize(value: str) -> str:
return (
value.lower().replace("_", " ").replace("-", " ").strip() if value else ""
)

View File

@@ -1,38 +0,0 @@
{
"Provider": "m365",
"CheckID": "entra_seamless_sso_disabled",
"CheckTitle": "Entra hybrid deployment does not have Seamless SSO enabled",
"CheckType": [],
"ServiceName": "entra",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Directory Sync Settings",
"ResourceGroup": "IAM",
"Description": "**Seamless Single Sign-On (SSO)** in hybrid Microsoft Entra deployments allows automatic authentication for domain-joined devices on the corporate network.\n\nThis check verifies the actual Seamless SSO configuration in directory synchronization settings. Modern devices with **Primary Refresh Token** (PRT) support no longer require Seamless SSO.",
"Risk": "Seamless SSO can be exploited for **lateral movement** between on-premises domains and Entra ID when an Entra Connect server is compromised. It can also be used to perform **brute force attacks** against Entra ID, as authentication through the AZUREADSSOACC account bypasses standard protections.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/entra/identity/hybrid/connect/how-to-connect-sso"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Open Microsoft Entra Connect configuration tool on the on-premises server.\n2. Navigate to **Change User Sign In**.\n3. Uncheck **Enable single sign-on**.\n4. Complete the configuration wizard.\n5. In Active Directory, run `Get-AzureADSSOStatus` to verify Seamless SSO shows `\"enable\":false`.\n6. Run `Disable-AzureADSSOForest` with domain admin credentials to remove the AZUREADSSOACC account.",
"Terraform": ""
},
"Recommendation": {
"Text": "Disable **Seamless SSO** in hybrid environments where modern devices support *Primary Refresh Token (PRT)*. Regularly audit Entra Connect settings and verify that the AZUREADSSOACC computer account is removed from Active Directory.",
"Url": "https://hub.prowler.com/check/entra_seamless_sso_disabled"
}
},
"Categories": [
"e3"
],
"DependsOn": [],
"RelatedTo": [
"entra_password_hash_sync_enabled"
],
"Notes": "Applies only to hybrid Microsoft Entra deployments using Entra Connect sync. The check reads the seamless_sso_enabled flag from the directory on-premises synchronization settings via Microsoft Graph API."
}

View File

@@ -1,82 +0,0 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.entra.entra_client import entra_client
class entra_seamless_sso_disabled(Check):
"""Check that Seamless Single Sign-On (SSO) is disabled for Microsoft Entra hybrid deployments.
Seamless SSO allows users to sign in without typing their passwords when on
corporate devices connected to the corporate network. When an Entra Connect server
is compromised, Seamless SSO can enable lateral movement between on-premises domains
and Entra ID, and it can also be exploited for brute force attacks. Modern devices with
Primary Refresh Token (PRT) support make this feature unnecessary for most organizations.
- PASS: Seamless SSO is disabled or on-premises sync is not enabled (cloud-only).
- FAIL: Seamless SSO is enabled in a hybrid deployment, or cannot verify due to insufficient permissions.
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the Seamless SSO disabled check.
Checks the directory sync settings to determine if Seamless SSO is enabled.
For hybrid environments, this check verifies the actual Seamless SSO configuration
rather than inferring from on-premises sync status.
Returns:
A list of CheckReportM365 objects with the result of the check.
"""
findings = []
# Check if there was an error retrieving directory sync settings
if entra_client.directory_sync_error:
for organization in entra_client.organizations:
report = CheckReportM365(
self.metadata(),
resource=organization,
resource_id=organization.id,
resource_name=organization.name,
)
# Only FAIL for hybrid orgs; cloud-only orgs don't need this permission
if organization.on_premises_sync_enabled:
report.status = "FAIL"
report.status_extended = f"Cannot verify Seamless SSO status for {organization.name}: {entra_client.directory_sync_error}."
else:
report.status = "PASS"
report.status_extended = f"Entra organization {organization.name} is cloud-only (no on-premises sync), Seamless SSO is not applicable."
findings.append(report)
return findings
# Process directory sync settings if available
for sync_settings in entra_client.directory_sync_settings:
report = CheckReportM365(
self.metadata(),
resource=sync_settings,
resource_id=sync_settings.id,
resource_name=f"Directory Sync {sync_settings.id}",
)
if sync_settings.seamless_sso_enabled:
report.status = "FAIL"
report.status_extended = f"Entra directory sync {sync_settings.id} has Seamless SSO enabled, which can be exploited for lateral movement and brute force attacks."
else:
report.status = "PASS"
report.status_extended = f"Entra directory sync {sync_settings.id} has Seamless SSO disabled."
findings.append(report)
# If no directory sync settings and no error, it's a cloud-only tenant
if not entra_client.directory_sync_settings:
for organization in entra_client.organizations:
report = CheckReportM365(
self.metadata(),
resource=organization,
resource_id=organization.id,
resource_name=organization.name,
)
report.status = "PASS"
report.status_extended = f"Entra organization {organization.name} is cloud-only (no on-premises sync), Seamless SSO is not applicable."
findings.append(report)
return findings

View File

@@ -1,14 +1,9 @@
import asyncio
import json
from asyncio import gather
from enum import Enum
from typing import Dict, List, Optional
from typing import List, Optional
from uuid import UUID
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
from msgraph.generated.security.microsoft_graph_security_run_hunting_query.run_hunting_query_post_request_body import (
RunHuntingQueryPostRequestBody,
)
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
@@ -17,33 +12,7 @@ from prowler.providers.m365.m365_provider import M365Provider
class Entra(M365Service):
"""
Microsoft Entra ID service class.
This class provides methods to retrieve and manage Microsoft Entra ID
security policies and configurations, including authorization policies,
conditional access policies, admin consent policies, groups, organizations,
users, and OAuth application data from Defender XDR.
Attributes:
tenant_domain (str): The tenant domain.
authorization_policy (AuthorizationPolicy): The authorization policy.
conditional_access_policies (dict): Dictionary of conditional access policies.
admin_consent_policy (AdminConsentPolicy): The admin consent policy.
groups (list): List of groups.
organizations (list): List of organizations.
users (dict): Dictionary of users.
user_accounts_status (dict): Dictionary of user account statuses.
oauth_apps (dict): Dictionary of OAuth applications from Defender XDR.
"""
def __init__(self, provider: M365Provider):
"""
Initialize the Entra service client.
Args:
provider: The M365Provider instance for authentication and configuration.
"""
super().__init__(provider)
if self.powershell:
@@ -78,8 +47,6 @@ class Entra(M365Service):
self._get_groups(),
self._get_organization(),
self._get_users(),
self._get_oauth_apps(),
self._get_directory_sync_settings(),
)
)
@@ -89,8 +56,6 @@ class Entra(M365Service):
self.groups = attributes[3]
self.organizations = attributes[4]
self.users = attributes[5]
self.oauth_apps: Optional[Dict[str, OAuthApp]] = attributes[6]
self.directory_sync_settings, self.directory_sync_error = attributes[7]
self.user_accounts_status = {}
if created_loop:
@@ -414,57 +379,6 @@ class Entra(M365Service):
return organizations
async def _get_directory_sync_settings(self):
"""Retrieve on-premises directory synchronization settings.
Fetches the directory synchronization configuration from Microsoft Graph API
to determine the state of synchronization features such as password sync,
device writeback, and other hybrid identity settings.
Returns:
A tuple containing:
- A list of DirectorySyncSettings objects, or an empty list if retrieval fails.
- An error message string if there was an access error, None otherwise.
"""
logger.info("Entra - Getting directory sync settings...")
directory_sync_settings = []
error_message = None
try:
sync_data = await self.client.directory.on_premises_synchronization.get()
for sync in getattr(sync_data, "value", []) or []:
features = getattr(sync, "features", None)
directory_sync_settings.append(
DirectorySyncSettings(
id=sync.id,
password_sync_enabled=getattr(
features, "password_sync_enabled", False
)
or False,
seamless_sso_enabled=getattr(
features, "seamless_sso_enabled", False
)
or False,
)
)
except ODataError as error:
error_code = getattr(error.error, "code", None) if error.error else None
if error_code == "Authorization_RequestDenied":
error_message = "Insufficient privileges to read directory sync settings. Required permission: OnPremDirectorySynchronization.Read.All or OnPremDirectorySynchronization.ReadWrite.All"
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error_message}"
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
error_message = str(error)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
error_message = str(error)
return directory_sync_settings, error_message
async def _get_users(self):
logger.info("Entra - Getting users...")
users = {}
@@ -547,122 +461,6 @@ class Entra(M365Service):
return registration_details
async def _get_oauth_apps(self) -> Optional[Dict[str, "OAuthApp"]]:
"""
Retrieve OAuth applications from Defender XDR using Advanced Hunting.
This method queries the OAuthAppInfo table to get information about
OAuth applications registered in the tenant, including their permissions
and usage status.
Returns:
Optional[Dict[str, OAuthApp]]: Dictionary of OAuth applications keyed by app ID,
or None if the API call failed (missing permissions or App Governance not enabled).
"""
logger.info("Entra - Getting OAuth apps from Defender XDR...")
oauth_apps: Optional[Dict[str, OAuthApp]] = {}
try:
# Query the OAuthAppInfo table using Advanced Hunting
# The query gets apps with their permissions including usage status
query = """
OAuthAppInfo
| project OAuthAppId, AppName, AppStatus, PrivilegeLevel, Permissions,
ServicePrincipalId, IsAdminConsented, LastUsedTime, AppOrigin
"""
request_body = RunHuntingQueryPostRequestBody(query=query)
result = await self.client.security.microsoft_graph_security_run_hunting_query.post(
request_body
)
if result and result.results:
for row in result.results:
row_data = row.additional_data
raw_app_id = row_data.get("OAuthAppId", "")
# Convert to string in case API returns non-string type
app_id = str(raw_app_id) if raw_app_id else ""
if not app_id:
continue
# Parse the permissions array
# Permissions can be a list of JSON strings or a list of dicts
permissions = []
raw_permissions = row_data.get("Permissions", [])
if raw_permissions:
for perm in raw_permissions:
# Parse JSON string if needed
if isinstance(perm, str):
try:
perm = json.loads(perm)
except json.JSONDecodeError:
continue
if isinstance(perm, dict):
permissions.append(
OAuthAppPermission(
name=str(perm.get("PermissionValue", "")),
target_app_id=str(perm.get("TargetAppId", "")),
target_app_name=str(
perm.get("TargetAppDisplayName", "")
),
permission_type=str(
perm.get("PermissionType", "")
),
classification=str(
perm.get(
"Classification",
perm.get(
"PermissionClassification", ""
),
)
),
privilege_level=str(
perm.get("PrivilegeLevel", "")
),
usage_status=str(perm.get("InUse", "")),
)
)
# Convert values to strings to handle API returning non-string types
raw_service_principal_id = row_data.get("ServicePrincipalId", "")
service_principal_id = (
str(raw_service_principal_id)
if raw_service_principal_id
else ""
)
raw_last_used_time = row_data.get("LastUsedTime")
last_used_time = (
str(raw_last_used_time)
if raw_last_used_time is not None
else None
)
oauth_apps[app_id] = OAuthApp(
id=app_id,
name=str(row_data.get("AppName", "")),
status=str(row_data.get("AppStatus", "")),
privilege_level=str(row_data.get("PrivilegeLevel", "")),
permissions=permissions,
service_principal_id=service_principal_id,
is_admin_consented=bool(
row_data.get("IsAdminConsented", False)
),
last_used_time=last_used_time,
app_origin=str(row_data.get("AppOrigin", "")),
)
except Exception as error:
# Log the error and return None to indicate API failure
# This API requires ThreatHunting.Read.All permission and App Governance to be enabled
logger.warning(
f"Entra - Could not retrieve OAuth apps from Defender XDR. "
f"This requires ThreatHunting.Read.All permission and App Governance enabled. "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
return oauth_apps
class ConditionalAccessPolicyState(Enum):
ENABLED = "enabled"
@@ -801,19 +599,6 @@ class Organization(BaseModel):
on_premises_sync_enabled: bool
class DirectorySyncSettings(BaseModel):
"""On-premises directory synchronization settings.
Represents the synchronization configuration for a tenant, including feature
flags that control hybrid identity behaviors such as password synchronization
and Seamless SSO.
"""
id: str
password_sync_enabled: bool = False
seamless_sso_enabled: bool = False
class Group(BaseModel):
id: str
name: str
@@ -866,53 +651,3 @@ class AuthPolicyRoles(Enum):
USER = UUID("a0b1b346-4d3e-4e8b-98f8-753987be4970")
GUEST_USER = UUID("10dae51f-b6af-4016-8d66-8c2a99b929b3")
GUEST_USER_ACCESS_RESTRICTED = UUID("2af84b1e-32c8-42b7-82bc-daa82404023b")
class OAuthAppPermission(BaseModel):
"""
Model for OAuth application permission.
Attributes:
name: The permission name.
target_app_id: The target application ID that provides this permission.
target_app_name: The target application display name.
permission_type: The type of permission (Application or Delegated).
classification: Optional plane classification (e.g. Control Plane, Management Plane).
privilege_level: The privilege level (High, Medium, Low).
usage_status: The usage status (InUse or NotInUse).
"""
name: str
target_app_id: str = ""
target_app_name: str = ""
permission_type: str = ""
classification: str = ""
privilege_level: str = ""
usage_status: str = ""
class OAuthApp(BaseModel):
"""
Model for OAuth application from Defender XDR.
Attributes:
id: The application ID.
name: The application display name.
status: The application status (Enabled, Disabled, etc.).
privilege_level: The overall privilege level of the app.
permissions: List of permissions assigned to the app.
service_principal_id: The service principal ID.
is_admin_consented: Whether the app has admin consent.
last_used_time: When the app was last used.
app_origin: Whether the app is internal or external.
"""
id: str
name: str
status: str = ""
privilege_level: str = ""
permissions: List[OAuthAppPermission] = []
service_principal_id: str = ""
is_admin_consented: bool = False
last_used_time: Optional[str] = None
app_origin: str = ""

View File

@@ -1,7 +1,7 @@
import sys
from io import StringIO
from mock import MagicMock, patch
from mock import patch
from prowler.config.config import prowler_version, timestamp
from prowler.lib.logger import logger
@@ -350,62 +350,6 @@ mongodbatlas_html_assessment_summary = """
</div>
</div>"""
image_registry_html_assessment_summary = """
<div class="col-md-2">
<div class="card">
<div class="card-header">
Image Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Registry URL:</b> myregistry.io
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Image Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Image authentication method:</b> Docker login
</li>
</ul>
</div>
</div>"""
image_list_html_assessment_summary = """
<div class="col-md-2">
<div class="card">
<div class="card-header">
Image Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Images:</b> nginx:latest, alpine:3.18
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Image Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Image authentication method:</b> No auth
</li>
</ul>
</div>
</div>"""
def get_aws_html_header(args: list) -> str:
"""
@@ -910,36 +854,6 @@ class TestHTML:
assert summary == mongodbatlas_html_assessment_summary
def test_image_get_assessment_summary_with_registry(self):
"""Test Image HTML assessment summary with registry URL."""
findings = [generate_finding_output()]
output = HTML(findings)
provider = MagicMock()
provider.type = "image"
provider.registry = "myregistry.io"
provider.images = ["nginx:latest", "alpine:3.18"]
provider.auth_method = "Docker login"
summary = output.get_assessment_summary(provider)
assert summary == image_registry_html_assessment_summary
def test_image_get_assessment_summary_with_images(self):
"""Test Image HTML assessment summary with image list."""
findings = [generate_finding_output()]
output = HTML(findings)
provider = MagicMock()
provider.type = "image"
provider.registry = None
provider.images = ["nginx:latest", "alpine:3.18"]
provider.auth_method = "No auth"
summary = output.get_assessment_summary(provider)
assert summary == image_list_html_assessment_summary
def test_process_markdown_bold_text(self):
"""Test that **text** is converted to <strong>text</strong>"""
test_text = "This is **bold text** and this is **also bold**"

View File

@@ -45,16 +45,8 @@ SAMPLE_UNKNOWN_SEVERITY_FINDING = {
"Description": "An issue with unknown severity.",
}
# Sample image SHA for testing (first 12 chars of a sha256 digest)
SAMPLE_IMAGE_SHA = "c1aabb73d233"
SAMPLE_IMAGE_ID = f"sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"
# Full Trivy JSON output structure with a single vulnerability
SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Metadata": {
"ImageID": SAMPLE_IMAGE_ID,
"RepoDigests": [f"alpine@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
},
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
@@ -63,15 +55,11 @@ SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Secrets": [],
"Misconfigurations": [],
}
],
]
}
# Full Trivy JSON output with mixed finding types
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Metadata": {
"ImageID": SAMPLE_IMAGE_ID,
"RepoDigests": [f"myimage@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
},
"Results": [
{
"Target": "myimage:latest (debian 12)",
@@ -80,36 +68,7 @@ SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Secrets": [SAMPLE_SECRET_FINDING],
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
}
],
}
# Trivy output with only RepoDigests (no ImageID) for fallback testing
SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT = {
"Metadata": {
"RepoDigests": ["alpine@sha256:e5f6g7h8i9j0abcdef1234567890"],
},
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
],
}
# Trivy output with no Metadata at all
SAMPLE_TRIVY_NO_METADATA_OUTPUT = {
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
],
]
}
@@ -131,13 +90,3 @@ def get_invalid_trivy_output():
def get_multi_type_trivy_output():
"""Return Trivy output with multiple finding types as string."""
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
def get_repo_digest_only_trivy_output():
"""Return Trivy output with only RepoDigests (no ImageID) as string."""
return json.dumps(SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT)
def get_no_metadata_trivy_output():
"""Return Trivy output with no Metadata as string."""
return json.dumps(SAMPLE_TRIVY_NO_METADATA_OUTPUT)

View File

@@ -1,4 +1,3 @@
import os
import tempfile
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -15,13 +14,11 @@ from prowler.providers.image.exceptions.exceptions import (
ImageListFileNotFoundError,
ImageListFileReadError,
ImageNoImagesProvidedError,
ImageRegistryAuthError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
SAMPLE_IMAGE_SHA,
SAMPLE_MISCONFIGURATION_FINDING,
SAMPLE_SECRET_FINDING,
SAMPLE_UNKNOWN_SEVERITY_FINDING,
@@ -29,8 +26,6 @@ from tests.providers.image.image_fixtures import (
get_empty_trivy_output,
get_invalid_trivy_output,
get_multi_type_trivy_output,
get_no_metadata_trivy_output,
get_repo_digest_only_trivy_output,
get_sample_trivy_json_output,
)
@@ -124,27 +119,22 @@ class TestImageProvider:
provider = _make_provider()
report = provider._process_finding(
SAMPLE_VULNERABILITY_FINDING,
"alpine:3.18",
"alpine:3.18 (alpine 3.18.0)",
image_sha="c1aabb73d233",
"alpine",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "CVE-2024-1234"
assert report.check_metadata.Severity == "high"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.ServiceName == "alpine"
assert report.check_metadata.ResourceType == "container-image"
assert report.check_metadata.ResourceGroup == "container"
assert report.package_name == "openssl"
assert report.installed_version == "1.1.1k-r0"
assert report.fixed_version == "1.1.1l-r0"
assert report.resource_name == "alpine:3.18"
assert report.image_sha == "c1aabb73d233"
assert report.resource_details == "alpine:3.18 (alpine 3.18.0)"
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
assert report.region == "container"
assert report.check_metadata.Categories == ["vulnerability"]
assert report.check_metadata.RelatedUrl == ""
def test_process_finding_secret(self):
"""Test processing a secret finding (identified by RuleID)."""
@@ -152,15 +142,14 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_SECRET_FINDING,
"myimage:latest",
"myimage:latest (debian 12)",
"secret",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "aws-access-key-id"
assert report.check_metadata.Severity == "critical"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.Categories == ["secrets"]
assert report.check_metadata.ServiceName == "secret"
def test_process_finding_misconfiguration(self):
"""Test processing a misconfiguration finding (identified by ID)."""
@@ -168,14 +157,13 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_MISCONFIGURATION_FINDING,
"myimage:latest",
"myimage:latest (debian 12)",
"misconfiguration",
)
assert isinstance(report, CheckReportImage)
assert report.check_metadata.CheckID == "DS001"
assert report.check_metadata.Severity == "medium"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.Categories == []
assert report.check_metadata.ServiceName == "misconfiguration"
def test_process_finding_unknown_severity(self):
"""Test that UNKNOWN severity is mapped to informational."""
@@ -183,7 +171,7 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_UNKNOWN_SEVERITY_FINDING,
"myimage:latest",
"myimage:latest (alpine 3.18.0)",
"alpine",
)
assert report.check_metadata.Severity == "informational"
@@ -202,9 +190,6 @@ class TestImageProvider:
assert len(reports) == 1
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
assert reports[0].resource_name == "alpine:3.18"
assert reports[0].check_metadata.ServiceName == "container-image"
@patch("subprocess.run")
def test_run_scan_empty_output(self, mock_subprocess):
@@ -289,23 +274,20 @@ class TestImageProvider:
)
assert "alpine:3.18" in output
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_success(self, mock_factory):
@patch("subprocess.run")
def test_test_connection_success(self, mock_subprocess):
"""Test successful connection returns is_connected=True."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["3.18", "latest"]
mock_factory.return_value = mock_adapter
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(image="alpine:3.18")
assert result.is_connected is True
mock_adapter.list_tags.assert_called_once_with("library/alpine")
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_auth_failure(self, mock_factory):
"""Test registry auth error returns auth failure."""
mock_factory.return_value = MagicMock(
list_tags=MagicMock(side_effect=ImageRegistryAuthError(file=__file__))
@patch("subprocess.run")
def test_test_connection_auth_failure(self, mock_subprocess):
"""Test 401 error returns auth failure."""
mock_subprocess.return_value = MagicMock(
returncode=1, stderr="401 unauthorized"
)
result = ImageProvider.test_connection(image="private/image:latest")
@@ -313,36 +295,16 @@ class TestImageProvider:
assert result.is_connected is False
assert "Authentication failed" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_not_found(self, mock_factory):
"""Test tag not found returns not found error."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["v1", "v2"]
mock_factory.return_value = mock_adapter
@patch("subprocess.run")
def test_test_connection_not_found(self, mock_subprocess):
"""Test 404 error returns not found."""
mock_subprocess.return_value = MagicMock(returncode=1, stderr="404 not found")
result = ImageProvider.test_connection(image="nonexistent/image:latest")
assert result.is_connected is False
assert "not found" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_url(self, mock_factory):
"""Test registry URL (namespace) uses list_repositories."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["andoniaf/myapp"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="docker.io/andoniaf")
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="docker.io/andoniaf",
username=None,
password=None,
token=None,
)
mock_adapter.list_repositories.assert_called_once()
def test_build_status_extended(self):
"""Test status message content for different finding types."""
provider = _make_provider()
@@ -427,51 +389,6 @@ class TestImageProvider:
for _ in provider._scan_single_image("private/image:latest"):
pass
@patch("subprocess.run")
def test_sha_extraction_from_image_id(self, mock_subprocess):
"""Test that image_sha is extracted from Trivy Metadata.ImageID."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
@patch("subprocess.run")
def test_sha_extraction_fallback_to_repo_digests(self, mock_subprocess):
"""Test that image_sha falls back to RepoDigests when ImageID is absent."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_repo_digest_only_trivy_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == "e5f6g7h8i9j0"
@patch("subprocess.run")
def test_sha_extraction_no_metadata(self, mock_subprocess):
"""Test that image_sha is empty when no Metadata is present."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_no_metadata_trivy_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == ""
@patch("subprocess.run")
def test_run_scan_propagates_scan_error(self, mock_subprocess):
"""Test that run_scan() re-raises ImageScanError instead of swallowing it."""
@@ -487,254 +404,6 @@ class TestImageProvider:
pass
class TestImageProviderRegistryAuth:
def test_no_auth_by_default(self):
"""Test that no auth is set when no credentials are provided."""
provider = _make_provider()
assert provider.registry_username is None
assert provider.registry_password is None
assert provider.registry_token is None
assert provider.auth_method == "No auth"
def test_basic_auth_with_explicit_params(self):
"""Test basic auth via explicit constructor params."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
assert provider.registry_username == "myuser"
assert provider.registry_password == "mypass"
assert provider.auth_method == "Docker login"
def test_token_auth_with_explicit_param(self):
"""Test token auth via explicit constructor param."""
provider = _make_provider(registry_token="my-token-123")
assert provider.registry_token == "my-token-123"
assert provider.auth_method == "Registry token"
def test_basic_auth_takes_precedence_over_token(self):
"""Test that username/password takes precedence over token."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
registry_token="my-token",
)
assert provider.auth_method == "Docker login"
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
)
def test_basic_auth_from_env_vars(self):
"""Test that env vars are used as fallback for basic auth."""
provider = _make_provider()
assert provider.registry_username == "envuser"
assert provider.registry_password == "envpass"
assert provider.auth_method == "Docker login"
@patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
def test_token_auth_from_env_var(self):
"""Test that env var is used as fallback for token auth."""
provider = _make_provider()
assert provider.registry_token == "env-token"
assert provider.auth_method == "Registry token"
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
)
def test_explicit_params_override_env_vars(self):
"""Test that explicit params take precedence over env vars."""
provider = _make_provider(
registry_username="explicit",
registry_password="explicit-pass",
)
assert provider.registry_username == "explicit"
assert provider.registry_password == "explicit-pass"
def test_build_trivy_env_no_auth(self):
"""Test that _build_trivy_env returns base env when no auth."""
provider = _make_provider()
env = provider._build_trivy_env()
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
assert "TRIVY_REGISTRY_TOKEN" not in env
def test_build_trivy_env_basic_auth_sets_env_vars(self):
"""Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
env = provider._build_trivy_env()
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
def test_build_trivy_env_token_auth(self):
"""Test that _build_trivy_env injects registry token."""
provider = _make_provider(registry_token="my-token")
env = provider._build_trivy_env()
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
def test_execute_trivy_sets_trivy_env_with_basic_auth(self, mock_subprocess):
"""Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_with_basic_auth(self, mock_factory):
"""Test test_connection passes credentials to the registry adapter."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["v1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_username="myuser",
registry_password="mypass",
)
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="private.registry.io",
username="myuser",
password="mypass",
token=None,
)
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_with_token(self, mock_factory):
"""Test test_connection passes token to the registry adapter."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["v1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_token="my-token",
)
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="private.registry.io",
username=None,
password=None,
token="my-token",
)
def test_print_credentials_shows_auth_method(self):
"""Test that print_credentials outputs the auth method."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with mock.patch("builtins.print") as mock_print:
provider.print_credentials()
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
assert "Docker login" in output
class TestExtractRegistry:
def test_docker_hub_simple(self):
assert ImageProvider._extract_registry("alpine:3.18") is None
def test_docker_hub_with_namespace(self):
assert ImageProvider._extract_registry("andoniaf/test-private:tag") is None
def test_ghcr(self):
assert ImageProvider._extract_registry("ghcr.io/user/image:tag") == "ghcr.io"
def test_ecr(self):
assert (
ImageProvider._extract_registry(
"123456789012.dkr.ecr.us-east-1.amazonaws.com/repo:tag"
)
== "123456789012.dkr.ecr.us-east-1.amazonaws.com"
)
def test_localhost_with_port(self):
assert (
ImageProvider._extract_registry("localhost:5000/myimage:latest")
== "localhost:5000"
)
def test_custom_registry_with_port(self):
assert (
ImageProvider._extract_registry("myregistry.io:5000/image:tag")
== "myregistry.io:5000"
)
def test_digest_reference(self):
assert (
ImageProvider._extract_registry("ghcr.io/user/image@sha256:abc123")
== "ghcr.io"
)
def test_bare_image_name(self):
assert ImageProvider._extract_registry("nginx") is None
class TestIsRegistryUrl:
def test_registry_url_with_namespace(self):
assert ImageProvider._is_registry_url("docker.io/andoniaf") is True
def test_registry_url_ghcr(self):
assert ImageProvider._is_registry_url("ghcr.io/org") is True
def test_image_ref_with_tag(self):
assert ImageProvider._is_registry_url("ghcr.io/user/image:tag") is False
def test_image_ref_with_repo(self):
assert ImageProvider._is_registry_url("ghcr.io/user/image") is False
def test_dockerhub_short_image(self):
assert ImageProvider._is_registry_url("alpine:3.18") is False
def test_dockerhub_with_namespace(self):
assert ImageProvider._is_registry_url("andoniaf/test:tag") is False
def test_bare_image_name(self):
assert ImageProvider._is_registry_url("nginx") is False
def test_localhost_namespace(self):
assert ImageProvider._is_registry_url("localhost:5000/myns") is True
def test_localhost_image_with_tag(self):
assert ImageProvider._is_registry_url("localhost:5000/myns/image:v1") is False
class TestCleanup:
def test_cleanup_idempotent(self):
"""Test cleanup is safe to call multiple times."""
provider = _make_provider()
provider.cleanup()
provider.cleanup()
class TestImageProviderInputValidation:
def test_invalid_timeout_format_raises_error(self):
"""Test that a non-matching timeout string raises ImageInvalidTimeoutError."""
@@ -924,67 +593,3 @@ class TestImageProviderNameValidation:
with pytest.raises(ImageListFileReadError):
_make_provider(images=None, image_list_file=file_path)
class TestScanPerImage:
@patch("subprocess.run")
def test_yields_per_image(self, mock_subprocess):
"""Test that scan_per_image yields (name, findings) per image."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
results = list(provider.scan_per_image())
assert len(results) == 2
for name, findings in results:
assert isinstance(name, str)
assert isinstance(findings, list)
assert all(isinstance(f, CheckReportImage) for f in findings)
@patch("subprocess.run")
def test_reraises_scan_error(self, mock_subprocess):
"""Test that ImageScanError propagates from scan_per_image."""
mock_subprocess.return_value = MagicMock(
returncode=1, stdout="", stderr="scan failed"
)
provider = _make_provider(images=["alpine:3.18"])
with pytest.raises(ImageScanError):
list(provider.scan_per_image())
@patch("subprocess.run")
def test_skips_generic_error(self, mock_subprocess):
"""Test that a generic RuntimeError in _scan_single_image yields empty findings and continues."""
def side_effect(cmd, **kwargs):
if "bad:image" in cmd:
raise RuntimeError("unexpected error")
return MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
mock_subprocess.side_effect = side_effect
provider = _make_provider(images=["bad:image", "alpine:3.18"])
results = list(provider.scan_per_image())
assert len(results) == 2
assert results[0][0] == "bad:image"
assert results[0][1] == []
assert results[1][0] == "alpine:3.18"
assert len(results[1][1]) > 0
@patch("subprocess.run")
def test_calls_cleanup(self, mock_subprocess):
"""Test that cleanup is called even after scan_per_image completes."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(images=["alpine:3.18"])
with mock.patch.object(provider, "cleanup") as mock_cleanup:
list(provider.scan_per_image())
mock_cleanup.assert_called_once()

View File

@@ -1,223 +0,0 @@
from argparse import Namespace
from prowler.providers.image.lib.arguments.arguments import validate_arguments
class TestValidateArguments:
def test_no_source_fails(self):
args = Namespace(
images=[],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--image" in msg
def test_image_only_passes(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_image_list_only_passes(self):
args = Namespace(
images=[],
image_list_file="images.txt",
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_registry_only_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_image_filter_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter="^prod",
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--image-filter requires --registry" in msg
def test_tag_filter_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter="^v",
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--tag-filter requires --registry" in msg
def test_max_images_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=50,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--max-images requires --registry" in msg
def test_registry_insecure_without_registry_fails(self):
args = Namespace(
images=[],
image_list_file="i.txt",
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=True,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--registry-insecure requires --registry" in msg
def test_docker_hub_no_namespace_fails(self):
args = Namespace(
images=[],
image_list_file=None,
registry="docker.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "namespace" in msg.lower()
def test_docker_hub_with_namespace_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="docker.io/myorg",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_docker_hub_https_no_namespace_fails(self):
args = Namespace(
images=[],
image_list_file=None,
registry="https://docker.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "namespace" in msg.lower()
def test_registry_with_filters_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="myregistry.io",
image_filter="^prod",
tag_filter="^v",
max_images=100,
registry_insecure=True,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_registry_list_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=True,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--registry-list requires --registry" in msg
def test_registry_list_with_registry_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=True,
)
ok, _ = validate_arguments(args)
assert ok
def test_combined_registry_and_image_passes(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok

View File

@@ -1,243 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
class TestDockerHubAdapterInit:
def test_extract_namespace_simple(self):
assert DockerHubAdapter._extract_namespace("docker.io/myorg") == "myorg"
def test_extract_namespace_https(self):
assert DockerHubAdapter._extract_namespace("https://docker.io/myorg") == "myorg"
def test_extract_namespace_registry1(self):
assert (
DockerHubAdapter._extract_namespace("registry-1.docker.io/myorg") == "myorg"
)
def test_extract_namespace_empty(self):
assert DockerHubAdapter._extract_namespace("docker.io") == ""
def test_extract_namespace_with_slash(self):
assert DockerHubAdapter._extract_namespace("docker.io/myorg/") == "myorg"
class TestDockerHubListRepositories:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos(self, mock_request):
# Hub login (now goes through requests.request via _request_with_retry)
login_resp = MagicMock(status_code=200)
login_resp.json.return_value = {"token": "jwt"}
# Repo listing
repos_resp = MagicMock(status_code=200)
repos_resp.json.return_value = {
"results": [{"name": "app1"}, {"name": "app2"}],
"next": None,
}
mock_request.side_effect = [login_resp, repos_resp]
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
repos = adapter.list_repositories()
assert repos == ["myorg/app1", "myorg/app2"]
def test_list_repos_no_namespace_raises(self):
adapter = DockerHubAdapter("docker.io")
with pytest.raises(ImageRegistryCatalogError, match="namespace"):
adapter.list_repositories()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_public_no_credentials(self, mock_request):
"""When no credentials are provided, use the public /v2/repositories/{ns}/ endpoint."""
repos_resp = MagicMock(status_code=200)
repos_resp.json.return_value = {
"results": [{"name": "repo1"}, {"name": "repo2"}],
"next": None,
}
mock_request.return_value = repos_resp
adapter = DockerHubAdapter("docker.io/publicns")
repos = adapter.list_repositories()
assert repos == ["publicns/repo1", "publicns/repo2"]
called_url = mock_request.call_args[0][1]
assert "/v2/repositories/publicns/" in called_url
assert "/v2/namespaces/" not in called_url
class TestDockerHubListTags:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags(self, mock_request):
# Token exchange (now goes through requests.request via _request_with_retry)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "registry-token"}
# Tag listing
tags_resp = MagicMock(status_code=200, headers={})
tags_resp.json.return_value = {"tags": ["latest", "v1.0"]}
mock_request.side_effect = [token_resp, tags_resp]
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
tags = adapter.list_tags("myorg/myapp")
assert tags == ["latest", "v1.0"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags_auth_failure(self, mock_request):
# Token exchange
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "tok"}
# Tag listing returns 401
tags_resp = MagicMock(status_code=401)
mock_request.side_effect = [token_resp, tags_resp]
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryAuthError):
adapter.list_tags("myorg/myapp")
class TestDockerHubLogin:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_failure(self, mock_request):
resp = MagicMock(status_code=401, text="invalid credentials")
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="bad", password="creds")
with pytest.raises(ImageRegistryAuthError, match="login failed"):
adapter._hub_login()
def test_login_skipped_without_credentials(self):
adapter = DockerHubAdapter("docker.io/myorg")
adapter._hub_login() # Should not raise
assert adapter._hub_jwt is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_401_includes_response_body(self, mock_request):
resp = MagicMock(
status_code=401, text='{"detail":"Incorrect authentication credentials"}'
)
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(
ImageRegistryAuthError, match="Incorrect authentication credentials"
):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_500_retried_then_raises_network_error(
self, mock_request, mock_sleep
):
mock_request.return_value = MagicMock(status_code=500)
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryNetworkError, match="Server error"):
adapter._hub_login()
assert mock_request.call_count == 3
class TestDockerHubRetry:
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_429(self, mock_request, mock_sleep):
resp_429 = MagicMock(status_code=429)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_429, resp_200]
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry(
"GET", "https://hub.docker.com/v2/namespaces/myorg/repositories"
)
assert result.status_code == 200
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_connection_error_retries(self, mock_request, mock_sleep):
mock_request.side_effect = requests.exceptions.ConnectionError("fail")
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryNetworkError):
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_500(self, mock_request, mock_sleep):
resp_500 = MagicMock(status_code=500)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_500, resp_200]
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry("GET", "https://hub.docker.com")
assert result.status_code == 200
assert mock_request.call_count == 2
mock_sleep.assert_called_once()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_exhausted_on_500_raises_network_error(
self, mock_request, mock_sleep
):
mock_request.return_value = MagicMock(status_code=500)
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(
ImageRegistryNetworkError, match="Server error.*HTTP 500.*3 attempts"
):
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_4xx_not_retried(self, mock_request, mock_sleep):
mock_request.return_value = MagicMock(status_code=403)
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry("GET", "https://hub.docker.com")
assert result.status_code == 403
assert mock_request.call_count == 1
mock_sleep.assert_not_called()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_request_sends_user_agent(self, mock_request):
mock_request.return_value = MagicMock(status_code=200)
adapter = DockerHubAdapter("docker.io/myorg")
adapter._request_with_retry("GET", "https://hub.docker.com")
_, kwargs = mock_request.call_args
from prowler.config.config import prowler_version
assert (
kwargs["headers"]["User-Agent"]
== f"Prowler/{prowler_version} (registry-adapter)"
)
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_500_includes_response_body(self, mock_request, mock_sleep):
resp_500 = MagicMock(status_code=500, text="<html>Cloudflare error</html>")
mock_request.return_value = resp_500
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryNetworkError, match="Cloudflare error"):
adapter._request_with_retry("GET", "https://hub.docker.com")
class TestDockerHubEmptyTokens:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_empty_hub_jwt_raises(self, mock_request):
resp = MagicMock(status_code=200)
resp.json.return_value = {"token": ""}
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty JWT"):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_none_hub_jwt_raises(self, mock_request):
resp = MagicMock(status_code=200)
resp.json.return_value = {}
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty JWT"):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_empty_registry_token_raises(self, mock_request):
resp = MagicMock(status_code=200)
resp.json.return_value = {"token": ""}
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._get_registry_token("myorg/myapp")

View File

@@ -1,34 +0,0 @@
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
from prowler.providers.image.lib.registry.factory import create_registry_adapter
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
class TestCreateRegistryAdapter:
def test_docker_hub_returns_dockerhub_adapter(self):
adapter = create_registry_adapter("docker.io/myorg")
assert isinstance(adapter, DockerHubAdapter)
def test_oci_returns_oci_adapter(self):
adapter = create_registry_adapter("myregistry.io")
assert isinstance(adapter, OciRegistryAdapter)
def test_ecr_returns_oci_adapter(self):
adapter = create_registry_adapter("123456789.dkr.ecr.us-east-1.amazonaws.com")
assert isinstance(adapter, OciRegistryAdapter)
def test_passes_credentials(self):
adapter = create_registry_adapter(
"myregistry.io",
username="user",
password="pass",
token="tok",
verify_ssl=False,
)
assert adapter.username == "user"
assert adapter.password == "pass"
assert adapter.token == "tok"
assert adapter.verify_ssl is False
def test_registry_1_docker_io(self):
adapter = create_registry_adapter("registry-1.docker.io/myorg")
assert isinstance(adapter, DockerHubAdapter)

View File

@@ -1,428 +0,0 @@
import base64
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
class TestOciAdapterInit:
def test_normalise_url_adds_https(self):
adapter = OciRegistryAdapter("myregistry.io")
assert adapter._base_url == "https://myregistry.io"
def test_normalise_url_keeps_http(self):
adapter = OciRegistryAdapter("http://myregistry.io")
assert adapter._base_url == "http://myregistry.io"
def test_normalise_url_strips_trailing_slash(self):
adapter = OciRegistryAdapter("https://myregistry.io/")
assert adapter._base_url == "https://myregistry.io"
def test_stores_credentials(self):
adapter = OciRegistryAdapter(
"reg.io", username="u", password="p", token="t", verify_ssl=False
)
assert adapter.username == "u"
assert adapter.password == "p"
assert adapter.token == "t"
assert adapter.verify_ssl is False
class TestOciAdapterAuth:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_with_token(self, mock_request):
adapter = OciRegistryAdapter("reg.io", token="my-token")
adapter._ensure_auth()
assert adapter._bearer_token == "my-token"
mock_request.assert_not_called()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_anonymous_ok(self, mock_request):
resp = MagicMock(status_code=200)
mock_request.return_value = resp
adapter = OciRegistryAdapter("reg.io")
adapter._ensure_auth()
assert adapter._bearer_token is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_bearer_challenge(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "bearer-tok"}
mock_request.side_effect = [ping_resp, token_resp]
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
adapter._ensure_auth()
assert adapter._bearer_token == "bearer-tok"
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_403_raises(self, mock_request):
resp = MagicMock(status_code=403)
mock_request.return_value = resp
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError):
adapter._ensure_auth()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_basic_challenge_with_creds(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
mock_request.return_value = ping_resp
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="tok")
adapter._ensure_auth()
assert adapter._basic_auth_verified is True
assert adapter._bearer_token is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_basic_challenge_no_creds(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
mock_request.return_value = ping_resp
adapter = OciRegistryAdapter("ecr.aws")
with pytest.raises(ImageRegistryAuthError):
adapter._ensure_auth()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_basic_auth_used_in_requests(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
catalog_resp = MagicMock(status_code=200, headers={})
catalog_resp.json.return_value = {"repositories": ["myapp"]}
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="tok")
adapter._ensure_auth()
adapter._authed_request("GET", "https://ecr.aws/v2/_catalog")
# The catalog request should use Basic auth (auth kwarg), not Bearer header
call_kwargs = mock_request.call_args_list[1][1]
assert call_kwargs.get("auth") == ("AWS", "tok")
assert "Authorization" not in call_kwargs.get("headers", {})
def test_resolve_basic_credentials_decodes_base64_token(self):
raw_password = "real-jwt-password"
encoded = base64.b64encode(f"AWS:{raw_password}".encode()).decode()
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=encoded)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == raw_password
def test_resolve_basic_credentials_passthrough_raw_password(self):
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="plain-pass")
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == "plain-pass"
def test_resolve_basic_credentials_passthrough_invalid_base64(self):
adapter = OciRegistryAdapter(
"ecr.aws", username="AWS", password="not!valid~base64"
)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == "not!valid~base64"
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_basic_auth_decodes_ecr_token_in_request(self, mock_request):
raw_password = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.abc"
encoded = base64.b64encode(f"AWS:{raw_password}".encode()).decode()
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
catalog_resp = MagicMock(status_code=200, headers={})
catalog_resp.json.return_value = {"repositories": ["myapp"]}
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=encoded)
adapter._ensure_auth()
adapter._authed_request("GET", "https://ecr.aws/v2/_catalog")
call_kwargs = mock_request.call_args_list[1][1]
assert call_kwargs.get("auth") == ("AWS", raw_password)
def test_resolve_basic_credentials_none_password(self):
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=None)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_authed_request_retries_on_401_with_bearer(self, mock_request):
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
adapter._bearer_token = "expired-token"
# First request: 401 (expired token)
resp_401 = MagicMock(status_code=401)
# _ensure_auth ping: 401 with bearer challenge
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.reg.io/token",service="registry"'
},
)
# Token exchange: success
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "new-token"}
# Second request: 200 (new token works)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_401, ping_resp, token_resp, resp_200]
result = adapter._authed_request("GET", "https://reg.io/v2/myapp/tags/list")
assert result.status_code == 200
assert adapter._bearer_token == "new-token"
assert mock_request.call_count == 4
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_authed_request_no_retry_on_401_without_bearer(self, mock_request):
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
adapter._basic_auth_verified = True
# No bearer token — using basic auth
resp_401 = MagicMock(status_code=401)
mock_request.return_value = resp_401
result = adapter._authed_request("GET", "https://reg.io/v2/_catalog")
assert result.status_code == 401
# Should only be called once (no retry for basic auth)
assert mock_request.call_count == 1
class TestOciAdapterListRepositories:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_single_page(self, mock_request):
ping_resp = MagicMock(status_code=200)
catalog_resp = MagicMock(status_code=200, headers={})
catalog_resp.json.return_value = {
"repositories": ["app/frontend", "app/backend"]
}
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("reg.io")
repos = adapter.list_repositories()
assert repos == ["app/frontend", "app/backend"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_paginated(self, mock_request):
ping_resp = MagicMock(status_code=200)
page1_resp = MagicMock(
status_code=200,
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'},
)
page1_resp.json.return_value = {"repositories": ["a"]}
page2_resp = MagicMock(status_code=200, headers={})
page2_resp.json.return_value = {"repositories": ["b"]}
mock_request.side_effect = [ping_resp, page1_resp, page2_resp]
adapter = OciRegistryAdapter("reg.io")
repos = adapter.list_repositories()
assert repos == ["a", "b"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_404_raises(self, mock_request):
ping_resp = MagicMock(status_code=200)
catalog_resp = MagicMock(status_code=404)
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryCatalogError):
adapter.list_repositories()
class TestOciAdapterListTags:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags(self, mock_request):
ping_resp = MagicMock(status_code=200)
tags_resp = MagicMock(status_code=200, headers={})
tags_resp.json.return_value = {"tags": ["latest", "v1.0"]}
mock_request.side_effect = [ping_resp, tags_resp]
adapter = OciRegistryAdapter("reg.io")
tags = adapter.list_tags("myapp")
assert tags == ["latest", "v1.0"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags_null_tags(self, mock_request):
ping_resp = MagicMock(status_code=200)
tags_resp = MagicMock(status_code=200, headers={})
tags_resp.json.return_value = {"tags": None}
mock_request.side_effect = [ping_resp, tags_resp]
adapter = OciRegistryAdapter("reg.io")
tags = adapter.list_tags("myapp")
assert tags == []
class TestOciAdapterRetry:
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_429(self, mock_request, mock_sleep):
resp_429 = MagicMock(status_code=429)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_429, resp_200]
adapter = OciRegistryAdapter("reg.io")
result = adapter._request_with_retry("GET", "https://reg.io/v2/")
assert result.status_code == 200
mock_sleep.assert_called_once()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_connection_error_retries(self, mock_request, mock_sleep):
mock_request.side_effect = requests.exceptions.ConnectionError("failed")
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryNetworkError):
adapter._request_with_retry("GET", "https://reg.io/v2/")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_timeout_raises_immediately(self, mock_request):
mock_request.side_effect = requests.exceptions.Timeout("timeout")
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryNetworkError):
adapter._request_with_retry("GET", "https://reg.io/v2/")
assert mock_request.call_count == 1
class TestOciAdapterNextPageUrl:
def test_no_link_header(self):
resp = MagicMock(headers={})
assert OciRegistryAdapter._next_page_url(resp) is None
def test_link_header_with_next(self):
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'}
)
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_relative_url(self):
resp = MagicMock(
headers={"Link": '</v2/_catalog?n=200&last=b>; rel="next"'},
url="https://reg.io/v2/_catalog?n=200",
)
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_no_next(self):
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200>; rel="prev"'}
)
assert OciRegistryAdapter._next_page_url(resp) is None
class TestOciAdapterSSRF:
def test_reject_file_scheme(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
adapter._validate_realm_url("file:///etc/passwd")
def test_reject_ftp_scheme(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
adapter._validate_realm_url("ftp://evil.com/token")
def test_reject_private_ip(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://10.0.0.1/token")
def test_reject_loopback(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://127.0.0.1/token")
def test_reject_link_local(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://169.254.169.254/latest/meta-data")
def test_accept_public_https(self):
adapter = OciRegistryAdapter("reg.io")
# Should not raise
adapter._validate_realm_url("https://auth.example.com/token")
def test_accept_hostname_not_ip(self):
adapter = OciRegistryAdapter("reg.io")
# Hostnames (not IPs) should pass even if they resolve to private IPs
adapter._validate_realm_url("https://internal.corp.com/token")
class TestOciAdapterEmptyToken:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_empty_bearer_token_raises(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "", "access_token": ""}
mock_request.side_effect = [ping_resp, token_resp]
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._ensure_auth()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_none_bearer_token_raises(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {}
mock_request.side_effect = [ping_resp, token_resp]
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._ensure_auth()
class TestOciAdapterNarrowExcept:
def test_invalid_utf8_base64_falls_through(self):
# Create a base64 string that decodes to invalid UTF-8
invalid_bytes = base64.b64encode(b"\xff\xfe").decode()
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=invalid_bytes)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == invalid_bytes
class TestCredentialRedaction:
def test_getstate_redacts_credentials(self):
adapter = OciRegistryAdapter(
"reg.io", username="u", password="secret", token="tok"
)
state = adapter.__getstate__()
assert state["_password"] == "***"
assert state["_token"] == "***"
assert state["username"] == "u"
assert state["registry_url"] == "reg.io"
def test_getstate_none_credentials(self):
adapter = OciRegistryAdapter("reg.io")
state = adapter.__getstate__()
assert state["_password"] is None
assert state["_token"] is None
def test_repr_redacts_credentials(self):
adapter = OciRegistryAdapter(
"reg.io", username="u", password="s3cret_pw", token="s3cret_tk"
)
r = repr(adapter)
assert "s3cret_pw" not in r
assert "s3cret_tk" not in r
assert "<redacted>" in r
def test_properties_still_work(self):
adapter = OciRegistryAdapter("reg.io", password="secret", token="tok")
assert adapter.password == "secret"
assert adapter.token == "tok"

View File

@@ -1,234 +0,0 @@
import os
from unittest.mock import MagicMock, patch
import pytest
from prowler.providers.image.exceptions.exceptions import (
ImageInvalidFilterError,
ImageMaxImagesExceededError,
)
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
_CLEAN_ENV = {
"PATH": os.environ.get("PATH", ""),
"HOME": os.environ.get("HOME", ""),
}
def _build_provider(**overrides):
defaults = dict(
images=[],
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
config_content={"image": {}},
)
defaults.update(overrides)
with patch.dict(os.environ, _CLEAN_ENV, clear=True):
return ImageProvider(**defaults)
class TestRegistryEnumeration:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_enumerate_oci_registry(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
provider = _build_provider()
assert "myregistry.io/app/frontend:latest" in provider.images
assert "myregistry.io/app/frontend:v1.0" in provider.images
assert "myregistry.io/app/backend:latest" in provider.images
assert len(provider.images) == 3
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_image_filter(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["prod/app", "dev/app", "staging/app"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(image_filter="^prod/")
assert len(provider.images) == 1
assert "myregistry.io/prod/app:latest" in provider.images
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_tag_filter(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["myapp"]
adapter.list_tags.return_value = ["latest", "v1.0", "v2.0", "dev-abc123"]
mock_factory.return_value = adapter
provider = _build_provider(tag_filter=r"^v\d+\.\d+$")
assert len(provider.images) == 2
assert "myregistry.io/myapp:v1.0" in provider.images
assert "myregistry.io/myapp:v2.0" in provider.images
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_combined_filters(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["prod/app", "dev/app"]
adapter.list_tags.return_value = ["latest", "v1.0"]
mock_factory.return_value = adapter
provider = _build_provider(image_filter="^prod/", tag_filter="^v")
assert len(provider.images) == 1
assert "myregistry.io/prod/app:v1.0" in provider.images
class TestMaxImages:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_max_images_exceeded(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app1", "app2", "app3"]
adapter.list_tags.return_value = ["latest", "v1.0"]
mock_factory.return_value = adapter
with pytest.raises(ImageMaxImagesExceededError):
_build_provider(max_images=2)
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_max_images_not_exceeded(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app1"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(max_images=10)
assert len(provider.images) == 1
class TestDeduplication:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_deduplication_with_explicit_images(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["myapp"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(images=["myregistry.io/myapp:latest"])
assert provider.images.count("myregistry.io/myapp:latest") == 1
class TestInvalidFilters:
def test_invalid_image_filter_regex(self):
with pytest.raises(ImageInvalidFilterError):
_build_provider(image_filter="[invalid")
def test_invalid_tag_filter_regex(self):
with pytest.raises(ImageInvalidFilterError):
_build_provider(tag_filter="(unclosed")
class TestRegistryInsecure:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_insecure_passes_verify_false(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
_build_provider(registry_insecure=True)
mock_factory.assert_called_once()
call_kwargs = mock_factory.call_args[1]
assert call_kwargs["verify_ssl"] is False
class TestEmptyRegistry:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_empty_catalog_with_explicit_images(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = []
mock_factory.return_value = adapter
provider = _build_provider(images=["nginx:latest"])
assert provider.images == ["nginx:latest"]
class TestRegistryList:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_prints_and_returns(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
provider = _build_provider(registry_list_images=True)
assert provider._listing_only is True
captured = capsys.readouterr()
assert "app/frontend" in captured.out
assert "app/backend" in captured.out
assert "latest" in captured.out
assert "v1.0" in captured.out
assert "2 repositories" in captured.out
assert "3 images" in captured.out
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_respects_image_filter(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["prod/app", "dev/app"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(registry_list_images=True, image_filter="^prod/")
assert provider._listing_only is True
captured = capsys.readouterr()
assert "prod/app" in captured.out
assert "dev/app" not in captured.out
assert "1 repository" in captured.out
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_respects_tag_filter(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["myapp"]
adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
mock_factory.return_value = adapter
provider = _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
assert provider._listing_only is True
captured = capsys.readouterr()
assert "v1.0" in captured.out
assert "dev-abc" not in captured.out
assert "1 image)" in captured.out
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_skips_max_images(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app1", "app2", "app3"]
adapter.list_tags.return_value = ["latest", "v1.0"]
mock_factory.return_value = adapter
# max_images=1 would normally raise, but --registry-list skips it
provider = _build_provider(registry_list_images=True, max_images=1)
assert provider._listing_only is True
captured = capsys.readouterr()
assert "6 images" in captured.out
class TestDockerHubEnumeration:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_dockerhub_images_use_repo_tag_format(self, mock_factory):
"""Docker Hub images should use repo:tag format without host prefix."""
adapter = MagicMock(spec=DockerHubAdapter)
adapter.list_repositories.return_value = ["myorg/app1", "myorg/app2"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
provider = _build_provider(registry="docker.io/myorg")
# Docker Hub images should NOT have host prefix
assert "myorg/app1:latest" in provider.images
assert "myorg/app1:v1.0" in provider.images
assert "myorg/app2:latest" in provider.images
# Ensure no host prefix was added
for img in provider.images:
assert not img.startswith("docker.io/"), f"Unexpected host prefix in {img}"
assert len(provider.images) == 3

View File

@@ -1,646 +0,0 @@
from unittest import mock
from prowler.lib.check.models import Severity
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
def create_mock_sensor():
"""Create a mock sensor for testing."""
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
Sensor,
)
return Sensor(
id="test-sensor-id",
display_name="Test Sensor",
sensor_type="domainControllerIntegrated",
deployment_status="upToDate",
health_status="healthy",
open_health_issues_count=0,
domain_name="example.com",
version="2.200.0.0",
created_date_time="2024-01-01T00:00:00Z",
)
class Test_defenderidentity_health_issues_no_open:
def test_no_health_issues_with_sensors(self):
"""Test when there are no health issues but sensors are deployed: expected PASS."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
defenderidentity_client.health_issues = []
defenderidentity_client.sensors = [create_mock_sensor()]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No open health issues found in Defender for Identity."
)
assert result[0].resource == {}
assert result[0].resource_name == "Defender for Identity"
assert result[0].resource_id == "defenderIdentity"
def test_no_sensors_deployed(self):
"""Test when no sensors are deployed: expected FAIL."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
defenderidentity_client.health_issues = []
defenderidentity_client.sensors = []
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "No sensors deployed" in result[0].status_extended
assert result[0].resource == {}
assert result[0].resource_name == "Defender for Identity"
assert result[0].resource_id == "defenderIdentity"
def test_both_apis_failed(self):
"""Test when both sensors and health_issues APIs fail (None): expected FAIL with permission message."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
defenderidentity_client.health_issues = None
defenderidentity_client.sensors = None
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "APIs are not accessible" in result[0].status_extended
assert "SecurityIdentitiesSensors.Read.All" in result[0].status_extended
assert "SecurityIdentitiesHealth.Read.All" in result[0].status_extended
assert result[0].resource == {}
assert result[0].resource_name == "Defender for Identity"
assert result[0].resource_id == "defenderIdentity"
def test_health_issues_api_failed_but_sensors_exist(self):
"""Test when health_issues API fails but sensors exist: expected FAIL with specific permission message."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
defenderidentity_client.health_issues = None
defenderidentity_client.sensors = [create_mock_sensor()]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Cannot read health issues" in result[0].status_extended
assert "1 sensor(s) deployed" in result[0].status_extended
assert "SecurityIdentitiesHealth.Read.All" in result[0].status_extended
assert result[0].resource == {}
assert result[0].resource_name == "Defender for Identity"
assert result[0].resource_id == "defenderIdentity"
def test_health_issue_resolved(self):
"""Test when a health issue has been resolved (status is not open)."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-1"
health_issue_name = "Test Health Issue Resolved"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A test health issue that has been resolved",
health_issue_type="sensor",
severity="medium",
status="closed",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=["sensor1.example.com"],
issue_type_id="test-issue-type-1",
recommendations=["Fix the issue"],
additional_information=[],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Defender for Identity sensor health issue {health_issue_name} is resolved."
)
assert result[0].resource_id == health_issue_id
assert result[0].resource_name == health_issue_name
def test_health_issue_open_high_severity(self):
"""Test when a health issue is open with high severity."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-2"
health_issue_name = "Critical Sensor Health Issue"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A critical health issue that is open",
health_issue_type="global",
severity="high",
status="open",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=[],
issue_type_id="test-issue-type-2",
recommendations=["Fix the critical issue immediately"],
additional_information=["Additional info about the issue"],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Defender for Identity global health issue {health_issue_name} is open with high severity."
)
assert result[0].resource_id == health_issue_id
assert result[0].resource_name == health_issue_name
assert result[0].check_metadata.Severity == Severity.high
def test_health_issue_open_medium_severity(self):
"""Test when a health issue is open with medium severity."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-3"
health_issue_name = "Medium Severity Sensor Issue"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A medium severity health issue",
health_issue_type="sensor",
severity="medium",
status="open",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=["sensor2.example.com"],
issue_type_id="test-issue-type-3",
recommendations=["Review and fix the issue"],
additional_information=[],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Defender for Identity sensor health issue {health_issue_name} is open with medium severity."
)
assert result[0].resource_id == health_issue_id
assert result[0].resource_name == health_issue_name
assert result[0].check_metadata.Severity == Severity.medium
def test_health_issue_open_low_severity(self):
"""Test when a health issue is open with low severity."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-4"
health_issue_name = "Low Severity Health Issue"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A low severity health issue",
health_issue_type="global",
severity="low",
status="open",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=[],
issue_type_id="test-issue-type-4",
recommendations=["Consider fixing the issue"],
additional_information=[],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Defender for Identity global health issue {health_issue_name} is open with low severity."
)
assert result[0].resource_id == health_issue_id
assert result[0].resource_name == health_issue_name
assert result[0].check_metadata.Severity == Severity.low
def test_multiple_health_issues_mixed_status(self):
"""Test when there are multiple health issues with different statuses."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id="issue-1",
display_name="Resolved Issue",
description="A resolved health issue",
health_issue_type="sensor",
severity="high",
status="closed",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=["sensor1.example.com"],
issue_type_id="type-1",
recommendations=[],
additional_information=[],
),
HealthIssue(
id="issue-2",
display_name="Open Issue",
description="An open health issue",
health_issue_type="global",
severity="medium",
status="open",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=[],
issue_type_id="type-2",
recommendations=["Fix this issue"],
additional_information=[],
),
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 2
# First result should be PASS (resolved issue)
assert result[0].status == "PASS"
assert result[0].resource_id == "issue-1"
assert result[0].resource_name == "Resolved Issue"
assert (
result[0].status_extended
== "Defender for Identity sensor health issue Resolved Issue is resolved."
)
# Second result should be FAIL (open issue)
assert result[1].status == "FAIL"
assert result[1].resource_id == "issue-2"
assert result[1].resource_name == "Open Issue"
assert (
result[1].status_extended
== "Defender for Identity global health issue Open Issue is open with medium severity."
)
assert result[1].check_metadata.Severity == Severity.medium
def test_health_issue_with_unknown_type_and_severity(self):
"""Test when health issue has None/unknown type and severity."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-5"
health_issue_name = "Unknown Type Issue"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A health issue with unknown type and severity",
health_issue_type=None,
severity=None,
status="open",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=[],
sensor_dns_names=[],
issue_type_id="test-issue-type-5",
recommendations=[],
additional_information=[],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Defender for Identity unknown health issue {health_issue_name} is open with unknown severity."
)
assert result[0].resource_id == health_issue_id
assert result[0].resource_name == health_issue_name
def test_health_issue_status_case_insensitive(self):
"""Test that status comparison is case insensitive (OPEN vs open)."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-6"
health_issue_name = "Uppercase Status Issue"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A health issue with uppercase OPEN status",
health_issue_type="sensor",
severity="high",
status="OPEN",
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=["example.com"],
sensor_dns_names=["sensor.example.com"],
issue_type_id="test-issue-type-6",
recommendations=["Fix the issue"],
additional_information=[],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Defender for Identity sensor health issue {health_issue_name} is open with high severity."
)
assert result[0].resource_id == health_issue_id
def test_health_issue_with_empty_status(self):
"""Test when health issue has empty/None status (treated as not open)."""
defenderidentity_client = mock.MagicMock()
defenderidentity_client.audited_tenant = "audited_tenant"
defenderidentity_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open.defenderidentity_client",
new=defenderidentity_client,
),
):
from prowler.providers.m365.services.defenderidentity.defenderidentity_health_issues_no_open.defenderidentity_health_issues_no_open import (
defenderidentity_health_issues_no_open,
)
from prowler.providers.m365.services.defenderidentity.defenderidentity_service import (
HealthIssue,
)
health_issue_id = "test-health-issue-id-7"
health_issue_name = "Empty Status Issue"
defenderidentity_client.sensors = [create_mock_sensor()]
defenderidentity_client.health_issues = [
HealthIssue(
id=health_issue_id,
display_name=health_issue_name,
description="A health issue with empty status",
health_issue_type="global",
severity="medium",
status=None,
created_date_time="2024-01-01T00:00:00Z",
last_modified_date_time="2024-01-02T00:00:00Z",
domain_names=[],
sensor_dns_names=[],
issue_type_id="test-issue-type-7",
recommendations=[],
additional_information=[],
)
]
check = defenderidentity_health_issues_no_open()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Defender for Identity global health issue {health_issue_name} is resolved."
)
assert result[0].resource_id == health_issue_id
assert result[0].resource_name == health_issue_name

View File

@@ -1,218 +0,0 @@
from unittest import mock
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
PendingCAMApproval,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defenderxdr_critical_asset_management_pending_approvals:
"""Tests for the defenderxdr_critical_asset_management_pending_approvals check."""
def test_api_failed_missing_permission(self):
"""Test FAIL when API call fails (None): missing ThreatHunting.Read.All permission."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Unable to query Critical Asset Management" in result[0].status_extended
)
assert "ThreatHunting.Read.All" in result[0].status_extended
assert result[0].resource_id == "criticalAssetManagement"
def test_no_pending_approvals_pass(self):
"""Test PASS scenario when there are no pending CAM approvals."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No pending approvals for Critical Asset Management classifications are found."
)
assert result[0].resource_name == "Critical Asset Management"
assert result[0].resource_id == "criticalAssetManagement"
assert result[0].resource == {}
def test_single_pending_approval_fail(self):
"""Test FAIL scenario when there is one pending CAM approval."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=2,
assets=["server-01", "server-02"],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 2 asset(s) pending approval: server-01, server-02."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"
assert (
result[0].resource == defenderxdr_client.pending_cam_approvals[0].dict()
)
def test_multiple_pending_approvals_fail(self):
"""Test FAIL scenario when there are multiple pending CAM approvals."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=1,
assets=["server-01"],
),
PendingCAMApproval(
classification="Critical",
pending_count=3,
assets=["db-01", "db-02", "db-03"],
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 1 asset(s) pending approval: server-01."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "Critical Asset Management classification 'Critical' has 3 asset(s) pending approval: db-01, db-02, db-03."
)
assert result[1].resource_name == "CAM Classification: Critical"
assert result[1].resource_id == "cam/Critical"
def test_pending_approval_with_more_than_five_assets_fail(self):
"""Test FAIL scenario with more than 5 assets to verify truncation."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.pending_cam_approvals = [
PendingCAMApproval(
classification="HighValue",
pending_count=7,
assets=[
"server-01",
"server-02",
"server-03",
"server-04",
"server-05",
"server-06",
"server-07",
],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_critical_asset_management_pending_approvals.defenderxdr_critical_asset_management_pending_approvals import (
defenderxdr_critical_asset_management_pending_approvals,
)
check = defenderxdr_critical_asset_management_pending_approvals()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Critical Asset Management classification 'HighValue' has 7 asset(s) pending approval: server-01, server-02, server-03, server-04, server-05 and 2 more."
)
assert result[0].resource_name == "CAM Classification: HighValue"
assert result[0].resource_id == "cam/HighValue"

View File

@@ -1,375 +0,0 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defenderxdr_endpoint_privileged_user_exposed_credentials:
"""Tests for the defenderxdr_endpoint_privileged_user_exposed_credentials check."""
def test_mde_status_api_failed(self):
"""Test FAIL when MDE status API call fails (None): missing permission."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = None
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Unable to query Microsoft Defender XDR" in result[0].status_extended
assert "ThreatHunting.Read.All" in result[0].status_extended
assert result[0].resource_id == "mdeStatus"
def test_mde_not_enabled(self):
"""Test FAIL when MDE is not enabled - security blind spot."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "not_enabled"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Microsoft Defender for Endpoint is not enabled"
in result[0].status_extended
)
assert "no visibility" in result[0].status_extended
assert result[0].resource_id == "mdeStatus"
def test_mde_no_devices(self):
"""Test PASS when MDE is enabled but no devices are onboarded."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "no_devices"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "enabled but no devices are onboarded" in result[0].status_extended
assert "No endpoints to evaluate" in result[0].status_extended
assert result[0].resource_id == "mdeDevices"
def test_exposed_credentials_query_failed(self):
"""Test FAIL when exposed credentials query fails (None)."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
defenderxdr_client.exposed_credentials_privileged_users = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Unable to query Security Exposure Management"
in result[0].status_extended
)
assert result[0].resource_id == "exposedCredentials"
def test_no_exposed_credentials(self):
"""Test PASS when no privileged users have exposed credentials."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
defenderxdr_client.exposed_credentials_privileged_users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
"No exposed credentials found for privileged users"
in result[0].status_extended
)
assert result[0].resource_name == "Defender XDR Exposure Management"
assert result[0].resource_id == "exposedCredentials"
def test_single_exposed_credential_with_credential_type(self):
"""Test FAIL when a privileged user has exposed credentials with type."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="CLI secret",
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "admin@contoso.com" in result[0].status_extended
assert "CLI secret" in result[0].status_extended
assert "WORKSTATION01" in result[0].status_extended
assert result[0].resource_name == "admin@contoso.com"
assert result[0].resource_id == "user-789"
def test_single_exposed_credential_without_credential_type(self):
"""Test FAIL when a privileged user has exposed credentials without type."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type=None,
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "admin@contoso.com" in result[0].status_extended
assert "WORKSTATION01" in result[0].status_extended
assert result[0].resource_name == "admin@contoso.com"
assert result[0].resource_id == "user-789"
def test_multiple_exposed_credentials(self):
"""Test FAIL for multiple privileged users with exposed credentials."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user_1 = ExposedCredentialPrivilegedUser(
edge_id="edge-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="user-789",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="CLI secret",
target_categories=["PrivilegedEntraIdRole"],
)
exposed_user_2 = ExposedCredentialPrivilegedUser(
edge_id="edge-456",
source_node_id="device-789",
source_node_name="SERVER01",
source_node_label="device",
target_node_id="user-012",
target_node_name="globaladmin@contoso.com",
target_node_label="user",
credential_type="user cookie",
target_categories=["PrivilegedEntraIdRole", "privileged"],
)
defenderxdr_client.exposed_credentials_privileged_users = [
exposed_user_1,
exposed_user_2,
]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].resource_name == "admin@contoso.com"
assert result[1].status == "FAIL"
assert result[1].resource_name == "globaladmin@contoso.com"
def test_exposed_credential_uses_edge_id_when_target_node_id_missing(self):
"""Test that edge_id is used as resource_id when target_node_id is empty."""
defenderxdr_client = mock.MagicMock()
defenderxdr_client.audited_tenant = "audited_tenant"
defenderxdr_client.audited_domain = DOMAIN
defenderxdr_client.mde_status = "active"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_client",
new=defenderxdr_client,
),
):
from prowler.providers.m365.services.defenderxdr.defenderxdr_endpoint_privileged_user_exposed_credentials.defenderxdr_endpoint_privileged_user_exposed_credentials import (
defenderxdr_endpoint_privileged_user_exposed_credentials,
)
from prowler.providers.m365.services.defenderxdr.defenderxdr_service import (
ExposedCredentialPrivilegedUser,
)
exposed_user = ExposedCredentialPrivilegedUser(
edge_id="edge-fallback-123",
source_node_id="device-456",
source_node_name="WORKSTATION01",
source_node_label="device",
target_node_id="",
target_node_name="admin@contoso.com",
target_node_label="user",
credential_type="sensitive token",
target_categories=["PrivilegedEntraIdRole"],
)
defenderxdr_client.exposed_credentials_privileged_users = [exposed_user]
check = defenderxdr_endpoint_privileged_user_exposed_credentials()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "edge-fallback-123"
assert result[0].resource_name == "admin@contoso.com"

View File

@@ -1,895 +0,0 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.m365.services.entra.entra_service import (
OAuthApp,
OAuthAppPermission,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_entra_app_registration_no_unused_privileged_permissions:
def test_no_oauth_apps(self):
"""No OAuth apps registered in tenant (empty dict): expected PASS."""
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No OAuth applications are registered in the tenant."
)
assert result[0].resource == {}
assert result[0].resource_name == "OAuth Applications"
assert result[0].resource_id == "oauthApps"
def test_no_oauth_apps_none(self):
"""OAuth apps is None (App Governance not enabled): expected FAIL."""
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = None
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "OAuth App Governance data is unavailable. Enable App Governance in Microsoft Defender for Cloud Apps and grant ThreatHunting.Read.All to evaluate unused privileged permissions."
)
assert result[0].resource == {}
assert result[0].resource_name == "OAuth Applications"
assert result[0].resource_id == "oauthApps"
def test_app_no_permissions(self):
"""App with no permissions: expected PASS."""
app_id = str(uuid4())
app_name = "Test App No Permissions"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="Low",
permissions=[],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_all_permissions_in_use(self):
"""App with all privileged permissions in use: expected PASS."""
app_id = str(uuid4())
app_name = "Test App All In Use"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
OAuthAppPermission(
name="User.Read.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_low_privilege_unused(self):
"""App with unused low privilege permissions (not high): expected PASS."""
app_id = str(uuid4())
app_name = "Test App Low Privilege Unused"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="Low",
permissions=[
OAuthAppPermission(
name="User.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="NotInUse",
),
OAuthAppPermission(
name="openid",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_medium_privilege_unused(self):
"""App with unused medium privilege permissions (not high): expected PASS."""
app_id = str(uuid4())
app_name = "Test App Medium Privilege Unused"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="Medium",
permissions=[
OAuthAppPermission(
name="Files.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Medium",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"App registration {app_name} has no unused privileged permissions."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_one_unused_high_privilege_permission(self):
"""App with one unused high privilege permission: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App One Unused High"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Mail.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_multiple_unused_high_privilege_permissions(self):
"""App with multiple unused high privilege permissions: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App Multiple Unused High"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 3 unused privileged permission(s): Mail.ReadWrite.All, Directory.ReadWrite.All, User.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_more_than_five_unused_high_privilege_permissions(self):
"""App with more than 5 unused high privilege permissions: expected FAIL with truncated list."""
app_id = str(uuid4())
app_name = "Test App Many Unused High"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Group.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Sites.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="RoleManagement.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Application.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="External",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 7 unused privileged permission(s): Mail.ReadWrite.All, Directory.ReadWrite.All, User.ReadWrite.All, Group.ReadWrite.All, Sites.ReadWrite.All (and 2 more)."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_unused_with_not_in_use_status(self):
"""App with unused permission using 'not_in_use' status variant: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App NotInUse Variant"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="not_in_use",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time=None,
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Mail.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_multiple_apps_mixed_results(self):
"""Multiple apps with mixed results: one PASS and one FAIL."""
app_id_pass = str(uuid4())
app_name_pass = "Test App Pass"
app_id_fail = str(uuid4())
app_name_fail = "Test App Fail"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id_pass: OAuthApp(
id=app_id_pass,
name=app_name_pass,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
),
app_id_fail: OAuthApp(
id=app_id_fail,
name=app_name_fail,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="External",
),
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 2
# Find results by app ID
result_pass = next(r for r in result if r.resource_id == app_id_pass)
result_fail = next(r for r in result if r.resource_id == app_id_fail)
assert result_pass.status == "PASS"
assert (
result_pass.status_extended
== f"App registration {app_name_pass} has no unused privileged permissions."
)
assert result_pass.resource_name == app_name_pass
assert result_fail.status == "FAIL"
assert (
result_fail.status_extended
== f"App registration {app_name_fail} has 1 unused privileged permission(s): Directory.ReadWrite.All."
)
assert result_fail.resource_name == app_name_fail
def test_app_mixed_privilege_levels_unused(self):
"""App with mixed privilege levels (High and Low) unused: only High triggers FAIL."""
app_id = str(uuid4())
app_name = "Test App Mixed Privileges"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Low",
usage_status="NotInUse",
),
OAuthAppPermission(
name="Files.Read",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Delegated",
privilege_level="Medium",
usage_status="NotInUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
# Only the High privilege permission should be reported
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Mail.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_high_privilege_in_use_and_unused(self):
"""App with some high privilege permissions in use and some unused: expected FAIL."""
app_id = str(uuid4())
app_name = "Test App Partial Usage"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name=app_name,
status="Enabled",
privilege_level="High",
permissions=[
OAuthAppPermission(
name="Mail.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
OAuthAppPermission(
name="Directory.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="NotInUse",
),
OAuthAppPermission(
name="User.ReadWrite.All",
target_app_id="00000003-0000-0000-c000-000000000000",
target_app_name="Microsoft Graph",
permission_type="Application",
privilege_level="High",
usage_status="InUse",
),
],
service_principal_id=str(uuid4()),
is_admin_consented=True,
last_used_time="2024-01-15T10:30:00Z",
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"App registration {app_name} has 1 unused privileged permission(s): Directory.ReadWrite.All."
)
assert result[0].resource_name == app_name
assert result[0].resource_id == app_id
def test_app_without_name_uses_id(self):
"""App without a name should use app_id as resource_name."""
app_id = str(uuid4())
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_app_registration_no_unused_privileged_permissions.entra_app_registration_no_unused_privileged_permissions import (
entra_app_registration_no_unused_privileged_permissions,
)
entra_client.oauth_apps = {
app_id: OAuthApp(
id=app_id,
name="",
status="Enabled",
privilege_level="Low",
permissions=[],
service_principal_id=str(uuid4()),
is_admin_consented=False,
last_used_time=None,
app_origin="Internal",
)
}
check = entra_app_registration_no_unused_privileged_permissions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == ""
assert result[0].resource_id == app_id

View File

@@ -1,274 +0,0 @@
from unittest import mock
from prowler.providers.m365.services.entra.entra_service import (
DirectorySyncSettings,
Organization,
)
from tests.providers.m365.m365_fixtures import set_mocked_m365_provider
class Test_entra_seamless_sso_disabled:
def test_seamless_sso_disabled(self):
"""Test PASS when Seamless SSO is disabled in directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
sync_settings = DirectorySyncSettings(
id="sync-001",
password_sync_enabled=True,
seamless_sso_enabled=False,
)
entra_client.directory_sync_settings = [sync_settings]
entra_client.directory_sync_error = None
entra_client.organizations = []
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Entra directory sync sync-001 has Seamless SSO disabled."
)
assert result[0].resource_id == "sync-001"
assert result[0].resource_name == "Directory Sync sync-001"
assert result[0].location == "global"
def test_seamless_sso_enabled(self):
"""Test FAIL when Seamless SSO is enabled in directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
sync_settings = DirectorySyncSettings(
id="sync-001",
password_sync_enabled=True,
seamless_sso_enabled=True,
)
entra_client.directory_sync_settings = [sync_settings]
entra_client.directory_sync_error = None
entra_client.organizations = []
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Entra directory sync sync-001 has Seamless SSO enabled, which can be exploited for lateral movement and brute force attacks."
)
assert result[0].resource_id == "sync-001"
assert result[0].resource_name == "Directory Sync sync-001"
assert result[0].location == "global"
def test_multiple_sync_settings_mixed(self):
"""Test mixed results with multiple directory sync configurations."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
sync_settings_1 = DirectorySyncSettings(
id="sync-001",
password_sync_enabled=True,
seamless_sso_enabled=True,
)
sync_settings_2 = DirectorySyncSettings(
id="sync-002",
password_sync_enabled=True,
seamless_sso_enabled=False,
)
entra_client.directory_sync_settings = [sync_settings_1, sync_settings_2]
entra_client.directory_sync_error = None
entra_client.organizations = []
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].resource_id == "sync-001"
assert result[1].status == "PASS"
assert result[1].resource_id == "sync-002"
def test_cloud_only_no_sync_settings(self):
"""Test PASS for cloud-only tenant with no directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
org = Organization(
id="org1",
name="Cloud Only Org",
on_premises_sync_enabled=False,
)
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = None
entra_client.organizations = [org]
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Entra organization Cloud Only Org is cloud-only (no on-premises sync), Seamless SSO is not applicable."
)
assert result[0].resource_id == "org1"
assert result[0].resource_name == "Cloud Only Org"
def test_insufficient_permissions_error(self):
"""Test FAIL when there's a permission error reading directory sync settings."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
org = Organization(
id="org1",
name="Prowler Org",
on_premises_sync_enabled=True,
)
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = "Insufficient privileges to read directory sync settings. Required permission: OnPremDirectorySynchronization.Read.All or OnPremDirectorySynchronization.ReadWrite.All"
entra_client.organizations = [org]
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Cannot verify Seamless SSO status" in result[0].status_extended
assert "Insufficient privileges" in result[0].status_extended
assert (
"OnPremDirectorySynchronization.Read.All" in result[0].status_extended
)
assert result[0].resource_id == "org1"
assert result[0].resource_name == "Prowler Org"
def test_insufficient_permissions_cloud_only_passes(self):
"""Test PASS for cloud-only org even when there's a permission error."""
entra_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
# Cloud-only org (on_premises_sync_enabled=False)
org = Organization(
id="org1",
name="Cloud Only Org",
on_premises_sync_enabled=False,
)
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = (
"Insufficient privileges to read directory sync settings."
)
entra_client.organizations = [org]
check = entra_seamless_sso_disabled()
result = check.execute()
# Should PASS because cloud-only orgs don't need this permission
assert len(result) == 1
assert result[0].status == "PASS"
assert "cloud-only" in result[0].status_extended
assert result[0].resource_id == "org1"
def test_empty_everything(self):
"""Test no findings when both sync settings and organizations are empty."""
entra_client = mock.MagicMock()
entra_client.directory_sync_settings = []
entra_client.directory_sync_error = None
entra_client.organizations = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_seamless_sso_disabled.entra_seamless_sso_disabled import (
entra_seamless_sso_disabled,
)
check = entra_seamless_sso_disabled()
result = check.execute()
assert len(result) == 0

View File

@@ -6,8 +6,6 @@ All notable changes to the **Prowler UI** are documented in this file.
### 🚀 Added
- Image (Container Registry) provider support in UI: badge icon, credentials form, and provider-type filtering
- OpenStack provider support in the UI [(#10046)](https://github.com/prowler-cloud/prowler/pull/10046)
- PDF report available for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)
- CSV and PDF download buttons in compliance views [(#10093)](https://github.com/prowler-cloud/prowler/pull/10093)

View File

@@ -10,11 +10,9 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "@/components/icons/providers-badge";
import {
@@ -35,11 +33,9 @@ const PROVIDER_ICON: Record<ProviderType, ReactNode> = {
m365: <M365ProviderBadge width={18} height={18} />,
github: <GitHubProviderBadge width={18} height={18} />,
iac: <IacProviderBadge width={18} height={18} />,
image: <ImageProviderBadge width={18} height={18} />,
oraclecloud: <OracleCloudProviderBadge width={18} height={18} />,
mongodbatlas: <MongoDBAtlasProviderBadge width={18} height={18} />,
alibabacloud: <AlibabaCloudProviderBadge width={18} height={18} />,
openstack: <OpenStackProviderBadge width={18} height={18} />,
};
interface AccountsSelectorProps {

View File

@@ -1,7 +1,7 @@
"use client";
import { useSearchParams } from "next/navigation";
import { type ComponentType, lazy, Suspense } from "react";
import { lazy, Suspense } from "react";
import {
MultiSelect,
@@ -48,11 +48,6 @@ const IacProviderBadge = lazy(() =>
default: m.IacProviderBadge,
})),
);
const ImageProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.ImageProviderBadge,
})),
);
const OracleCloudProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.OracleCloudProviderBadge,
@@ -68,11 +63,6 @@ const AlibabaCloudProviderBadge = lazy(() =>
default: m.AlibabaCloudProviderBadge,
})),
);
const OpenStackProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.OpenStackProviderBadge,
})),
);
type IconProps = { width: number; height: number };
@@ -82,7 +72,7 @@ const IconPlaceholder = ({ width, height }: IconProps) => (
const PROVIDER_DATA: Record<
ProviderType,
{ label: string; icon: ComponentType<IconProps> }
{ label: string; icon: React.ComponentType<IconProps> }
> = {
aws: {
label: "Amazon Web Services",
@@ -112,10 +102,6 @@ const PROVIDER_DATA: Record<
label: "Infrastructure as Code",
icon: IacProviderBadge,
},
image: {
label: "Container Registry",
icon: ImageProviderBadge,
},
oraclecloud: {
label: "Oracle Cloud Infrastructure",
icon: OracleCloudProviderBadge,
@@ -128,10 +114,6 @@ const PROVIDER_DATA: Record<
label: "Alibaba Cloud",
icon: AlibabaCloudProviderBadge,
},
openstack: {
label: "OpenStack",
icon: OpenStackProviderBadge,
},
};
type ProviderTypeSelectorProps = {

View File

@@ -5,11 +5,9 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "../icons/providers-badge";
@@ -85,15 +83,6 @@ export const CustomProviderInputIac = () => {
);
};
export const CustomProviderInputImage = () => {
return (
<div className="flex items-center gap-x-2">
<ImageProviderBadge width={25} height={25} />
<p className="text-sm">Container Registry</p>
</div>
);
};
export const CustomProviderInputOracleCloud = () => {
return (
<div className="flex items-center gap-x-2">
@@ -111,12 +100,3 @@ export const CustomProviderInputAlibabaCloud = () => {
</div>
);
};
export const CustomProviderInputOpenStack = () => {
return (
<div className="flex items-center gap-x-2">
<OpenStackProviderBadge width={25} height={25} />
<p className="text-sm">OpenStack</p>
</div>
);
};

View File

@@ -5,11 +5,9 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "@/components/icons/providers-badge";
import { ProviderType } from "@/types";
@@ -22,11 +20,9 @@ export const PROVIDER_ICONS = {
m365: M365ProviderBadge,
github: GitHubProviderBadge,
iac: IacProviderBadge,
image: ImageProviderBadge,
oraclecloud: OracleCloudProviderBadge,
mongodbatlas: MongoDBAtlasProviderBadge,
alibabacloud: AlibabaCloudProviderBadge,
openstack: OpenStackProviderBadge,
} as const;
interface ProviderIconCellProps {

View File

@@ -1,36 +0,0 @@
import { FC } from "react";
import { IconSvgProps } from "@/types";
export const ImageProviderBadge: FC<IconSvgProps> = ({
size,
width,
height,
...props
}) => (
<svg
xmlns="http://www.w3.org/2000/svg"
aria-hidden="true"
fill="none"
focusable="false"
height={size || height}
role="presentation"
viewBox="0 0 256 256"
width={size || width}
{...props}
>
<rect width="256" height="256" fill="#1c1917" rx="60" />
<g
transform="translate(20, 20) scale(9)"
fill="none"
stroke="#fff"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M12.89 1.45L21 5.75V18.25L12.89 22.55C12.33 22.84 11.67 22.84 11.11 22.55L3 18.25V5.75L11.11 1.45C11.67 1.16 12.33 1.16 12.89 1.45Z" />
<path d="M3.5 6L12 10.5L20.5 6" />
<path d="M12 22.5V10.5" />
</g>
</svg>
);

View File

@@ -8,11 +8,9 @@ import { AzureProviderBadge } from "./azure-provider-badge";
import { GCPProviderBadge } from "./gcp-provider-badge";
import { GitHubProviderBadge } from "./github-provider-badge";
import { IacProviderBadge } from "./iac-provider-badge";
import { ImageProviderBadge } from "./image-provider-badge";
import { KS8ProviderBadge } from "./ks8-provider-badge";
import { M365ProviderBadge } from "./m365-provider-badge";
import { MongoDBAtlasProviderBadge } from "./mongodbatlas-provider-badge";
import { OpenStackProviderBadge } from "./openstack-provider-badge";
import { OracleCloudProviderBadge } from "./oraclecloud-provider-badge";
export {
@@ -22,11 +20,9 @@ export {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
};
@@ -39,9 +35,7 @@ export const PROVIDER_ICONS: Record<string, FC<IconSvgProps>> = {
"Microsoft 365": M365ProviderBadge,
GitHub: GitHubProviderBadge,
"Infrastructure as Code": IacProviderBadge,
"Container Registry": ImageProviderBadge,
"Oracle Cloud Infrastructure": OracleCloudProviderBadge,
"MongoDB Atlas": MongoDBAtlasProviderBadge,
"Alibaba Cloud": AlibabaCloudProviderBadge,
OpenStack: OpenStackProviderBadge,
};

View File

@@ -1,29 +0,0 @@
import * as React from "react";
import { IconSvgProps } from "@/types";
export const OpenStackProviderBadge: React.FC<IconSvgProps> = ({
size,
width,
height,
...props
}) => (
<svg
xmlns="http://www.w3.org/2000/svg"
aria-hidden="true"
fill="none"
focusable="false"
height={size || height}
role="presentation"
viewBox="0 0 256 256"
width={size || width}
{...props}
>
<g fill="none">
<rect width="256" height="256" fill="#f4f2ed" rx="60" />
<g transform="translate(48 48) scale(2.5)" fill="#da1a32">
<path d="M58.054.68H5.946C2.676.68 0 3.356 0 6.626V20.64h14.452v-2.3c0-1.776 1.44-3.215 3.215-3.215h28.665c1.776 0 3.215 1.44 3.215 3.215v2.3H64v-14A5.97 5.97 0 0 0 58.054.68zm-8.506 44.97c0 1.776-1.44 3.215-3.215 3.215H17.67c-1.776 0-3.215-1.44-3.215-3.215v-2.3H0v14.013c0 3.27 2.676 5.946 5.946 5.946h52.108c3.27 0 5.946-2.676 5.946-5.946V43.36H49.548zM0 24.773h14.452v14.452H0zm49.548 0H64v14.452H49.548z" />
</g>
</g>
</svg>
);

View File

@@ -15,11 +15,9 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "../icons/providers-badge";
import { FormMessage } from "../ui/form";
@@ -65,11 +63,6 @@ const PROVIDERS = [
label: "Infrastructure as Code",
badge: IacProviderBadge,
},
{
value: "image",
label: "Container Registry",
badge: ImageProviderBadge,
},
{
value: "oraclecloud",
label: "Oracle Cloud Infrastructure",
@@ -80,11 +73,6 @@ const PROVIDERS = [
label: "Alibaba Cloud",
badge: AlibabaCloudProviderBadge,
},
{
value: "openstack",
label: "OpenStack",
badge: OpenStackProviderBadge,
},
] as const;
interface RadioGroupProviderProps {

View File

@@ -20,13 +20,11 @@ import {
GCPDefaultCredentials,
GCPServiceAccountKey,
IacCredentials,
ImageCredentials,
KubernetesCredentials,
M365CertificateCredentials,
M365ClientSecretCredentials,
MongoDBAtlasCredentials,
OCICredentials,
OpenStackCredentials,
ProviderType,
} from "@/types";
@@ -46,10 +44,8 @@ import {
import { AzureCredentialsForm } from "./via-credentials/azure-credentials-form";
import { GitHubCredentialsForm } from "./via-credentials/github-credentials-form";
import { IacCredentialsForm } from "./via-credentials/iac-credentials-form";
import { ImageCredentialsForm } from "./via-credentials/image-credentials-form";
import { KubernetesCredentialsForm } from "./via-credentials/k8s-credentials-form";
import { MongoDBAtlasCredentialsForm } from "./via-credentials/mongodbatlas-credentials-form";
import { OpenStackCredentialsForm } from "./via-credentials/openstack-credentials-form";
import { OracleCloudCredentialsForm } from "./via-credentials/oraclecloud-credentials-form";
type BaseCredentialsFormProps = {
@@ -182,11 +178,6 @@ export const BaseCredentialsForm = ({
control={form.control as unknown as Control<IacCredentials>}
/>
)}
{providerType === "image" && (
<ImageCredentialsForm
control={form.control as unknown as Control<ImageCredentials>}
/>
)}
{providerType === "oraclecloud" && (
<OracleCloudCredentialsForm
control={form.control as unknown as Control<OCICredentials>}
@@ -215,11 +206,6 @@ export const BaseCredentialsForm = ({
}
/>
)}
{providerType === "openstack" && (
<OpenStackCredentialsForm
control={form.control as unknown as Control<OpenStackCredentials>}
/>
)}
<div className="flex w-full justify-end gap-4">
{showBackButton && requiresBackButton(searchParamsObj.get("via")) && (

View File

@@ -57,11 +57,6 @@ const getProviderFieldDetails = (providerType?: ProviderType) => {
label: "Repository URL",
placeholder: "e.g. https://github.com/user/repo",
};
case "image":
return {
label: "Registry URL",
placeholder: "e.g. https://registry.example.com",
};
case "oraclecloud":
return {
label: "Tenancy OCID",
@@ -77,11 +72,6 @@ const getProviderFieldDetails = (providerType?: ProviderType) => {
label: "Account ID",
placeholder: "e.g. 1234567890123456",
};
case "openstack":
return {
label: "Project ID",
placeholder: "e.g. a1b2c3d4-e5f6-7890-abcd-ef1234567890",
};
default:
return {
label: "Provider UID",

View File

@@ -1,83 +0,0 @@
import { Control } from "react-hook-form";
import { CustomInput } from "@/components/ui/custom";
import { ImageCredentials } from "@/types";
export const ImageCredentialsForm = ({
control,
}: {
control: Control<ImageCredentials>;
}) => {
return (
<>
<div className="flex flex-col">
<div className="text-md text-default-foreground leading-9 font-bold">
Connect via Registry Credentials
</div>
<div className="text-default-500 text-sm">
Provide registry credentials to authenticate with your container
registry (all fields are optional).
</div>
</div>
<CustomInput
control={control}
name="registry_username"
label="Registry Username (Optional)"
labelPlacement="inside"
placeholder="Username for registry authentication"
variant="bordered"
type="text"
isRequired={false}
/>
<CustomInput
control={control}
name="registry_password"
label="Registry Password (Optional)"
labelPlacement="inside"
placeholder="Password for registry authentication"
variant="bordered"
type="password"
isRequired={false}
/>
<CustomInput
control={control}
name="registry_token"
label="Registry Token (Optional)"
labelPlacement="inside"
placeholder="Token for registry authentication"
variant="bordered"
type="password"
isRequired={false}
/>
<div className="flex flex-col pt-2">
<div className="text-md text-default-foreground leading-9 font-bold">
Scan Scope
</div>
<div className="text-default-500 text-sm">
Limit which repositories and tags are scanned using regex patterns.
</div>
</div>
<CustomInput
control={control}
name="image_filter"
label="Image Filter (Optional)"
labelPlacement="inside"
placeholder="e.g. ^prod/.*"
variant="bordered"
type="text"
isRequired={false}
/>
<CustomInput
control={control}
name="tag_filter"
label="Tag Filter (Optional)"
labelPlacement="inside"
placeholder="e.g. ^(latest|v\d+\.\d+\.\d+)$"
variant="bordered"
type="text"
isRequired={false}
/>
</>
);
};

View File

@@ -1,7 +1,5 @@
export * from "./azure-credentials-form";
export * from "./github-credentials-form";
export * from "./iac-credentials-form";
export * from "./image-credentials-form";
export * from "./k8s-credentials-form";
export * from "./mongodbatlas-credentials-form";
export * from "./openstack-credentials-form";

View File

@@ -1,43 +0,0 @@
import { Control } from "react-hook-form";
import { CustomInput, CustomTextarea } from "@/components/ui/custom";
import { OpenStackCredentials } from "@/types";
export const OpenStackCredentialsForm = ({
control,
}: {
control: Control<OpenStackCredentials>;
}) => {
return (
<>
<div className="flex flex-col">
<div className="text-md text-default-foreground leading-9 font-bold">
Connect via Clouds YAML
</div>
<div className="text-default-500 text-sm">
Please provide your OpenStack clouds.yaml content and the cloud name.
</div>
</div>
<CustomTextarea
control={control}
name="clouds_yaml_content"
label="Clouds YAML Content"
labelPlacement="inside"
placeholder="Paste your clouds.yaml content here"
variant="bordered"
minRows={10}
isRequired
/>
<CustomInput
control={control}
name="clouds_yaml_cloud"
type="text"
label="Cloud Name"
labelPlacement="inside"
placeholder="e.g. mycloud"
variant="bordered"
isRequired
/>
</>
);
};

View File

@@ -5,11 +5,9 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
OpenStackProviderBadge,
OracleCloudProviderBadge,
} from "@/components/icons/providers-badge";
import { ProviderType } from "@/types";
@@ -30,16 +28,12 @@ export const getProviderLogo = (provider: ProviderType) => {
return <GitHubProviderBadge width={35} height={35} />;
case "iac":
return <IacProviderBadge width={35} height={35} />;
case "image":
return <ImageProviderBadge width={35} height={35} />;
case "oraclecloud":
return <OracleCloudProviderBadge width={35} height={35} />;
case "mongodbatlas":
return <MongoDBAtlasProviderBadge width={35} height={35} />;
case "alibabacloud":
return <AlibabaCloudProviderBadge width={35} height={35} />;
case "openstack":
return <OpenStackProviderBadge width={35} height={35} />;
default:
return null;
}
@@ -61,16 +55,12 @@ export const getProviderName = (provider: ProviderType): string => {
return "GitHub";
case "iac":
return "Infrastructure as Code";
case "image":
return "Container Registry";
case "oraclecloud":
return "Oracle Cloud Infrastructure";
case "mongodbatlas":
return "MongoDB Atlas";
case "alibabacloud":
return "Alibaba Cloud";
case "openstack":
return "OpenStack";
default:
return "Unknown Provider";
}

View File

@@ -192,12 +192,6 @@ export const useCredentialsForm = ({
[ProviderCredentialFields.ALIBABACLOUD_ACCESS_KEY_ID]: "",
[ProviderCredentialFields.ALIBABACLOUD_ACCESS_KEY_SECRET]: "",
};
case "openstack":
return {
...baseDefaults,
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]: "",
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]: "",
};
default:
return baseDefaults;
}

View File

@@ -30,8 +30,4 @@ export const PROVIDER_CREDENTIALS_ERROR_MAPPING: Record<string, string> = {
ProviderCredentialFields.SERVICE_ACCOUNT_KEY,
[ErrorPointers.ATLAS_PUBLIC_KEY]: ProviderCredentialFields.ATLAS_PUBLIC_KEY,
[ErrorPointers.ATLAS_PRIVATE_KEY]: ProviderCredentialFields.ATLAS_PRIVATE_KEY,
[ErrorPointers.OPENSTACK_CLOUDS_YAML_CONTENT]:
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT,
[ErrorPointers.OPENSTACK_CLOUDS_YAML_CLOUD]:
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD,
};

View File

@@ -43,11 +43,6 @@ export const getProviderHelpText = (provider: string) => {
text: "Need help scanning your Infrastructure as Code repository?",
link: "https://goto.prowler.com/provider-iac",
};
case "image":
return {
text: "Need help scanning your container registry?",
link: "https://goto.prowler.com/provider-image",
};
case "oraclecloud":
return {
text: "Need help connecting your Oracle Cloud account?",
@@ -63,11 +58,6 @@ export const getProviderHelpText = (provider: string) => {
text: "Need help connecting your Alibaba Cloud account?",
link: "https://goto.prowler.com/provider-alibabacloud",
};
case "openstack":
return {
text: "Need help connecting your OpenStack cloud?",
link: "https://goto.prowler.com/provider-openstack",
};
default:
return {
text: "How to setup a provider?",

View File

@@ -250,20 +250,6 @@ export const buildAlibabaCloudSecret = (
return filterEmptyValues(secret);
};
export const buildOpenStackSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]: getFormValue(
formData,
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT,
),
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]: getFormValue(
formData,
ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD,
),
};
return filterEmptyValues(secret);
};
export const buildIacSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.REPOSITORY_URL]: getFormValue(
@@ -278,32 +264,6 @@ export const buildIacSecret = (formData: FormData) => {
return filterEmptyValues(secret);
};
export const buildImageSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.REGISTRY_USERNAME]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_USERNAME,
),
[ProviderCredentialFields.REGISTRY_PASSWORD]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_PASSWORD,
),
[ProviderCredentialFields.REGISTRY_TOKEN]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_TOKEN,
),
[ProviderCredentialFields.IMAGE_FILTER]: getFormValue(
formData,
ProviderCredentialFields.IMAGE_FILTER,
),
[ProviderCredentialFields.TAG_FILTER]: getFormValue(
formData,
ProviderCredentialFields.TAG_FILTER,
),
};
return filterEmptyValues(secret);
};
/**
* Utility function to safely encode a string to base64
* Handles UTF-8 characters properly without using deprecated APIs
@@ -397,10 +357,6 @@ export const buildSecretConfig = (
secretType: "static",
secret: buildIacSecret(formData),
}),
image: () => ({
secretType: "static",
secret: buildImageSecret(formData),
}),
oraclecloud: () => ({
secretType: "static",
secret: buildOracleCloudSecret(formData, providerUid),
@@ -417,10 +373,6 @@ export const buildSecretConfig = (
secret: buildAlibabaCloudSecret(formData, isRole),
};
},
openstack: () => ({
secretType: "static",
secret: buildOpenStackSecret(formData),
}),
};
const builder = secretBuilders[providerType];

View File

@@ -53,13 +53,6 @@ export const ProviderCredentialFields = {
REPOSITORY_URL: "repository_url",
ACCESS_TOKEN: "access_token",
// Image (Container Registry) fields
REGISTRY_USERNAME: "registry_username",
REGISTRY_PASSWORD: "registry_password",
REGISTRY_TOKEN: "registry_token",
IMAGE_FILTER: "image_filter",
TAG_FILTER: "tag_filter",
// OCI fields
OCI_USER: "user",
OCI_FINGERPRINT: "fingerprint",
@@ -74,10 +67,6 @@ export const ProviderCredentialFields = {
ALIBABACLOUD_ACCESS_KEY_SECRET: "access_key_secret",
ALIBABACLOUD_ROLE_ARN: "role_arn",
ALIBABACLOUD_ROLE_SESSION_NAME: "role_session_name",
// OpenStack fields
OPENSTACK_CLOUDS_YAML_CONTENT: "clouds_yaml_content",
OPENSTACK_CLOUDS_YAML_CLOUD: "clouds_yaml_cloud",
} as const;
// Type for credential field values
@@ -108,11 +97,6 @@ export const ErrorPointers = {
GITHUB_APP_KEY: "/data/attributes/secret/github_app_key_content",
REPOSITORY_URL: "/data/attributes/secret/repository_url",
ACCESS_TOKEN: "/data/attributes/secret/access_token",
REGISTRY_USERNAME: "/data/attributes/secret/registry_username",
REGISTRY_PASSWORD: "/data/attributes/secret/registry_password",
REGISTRY_TOKEN: "/data/attributes/secret/registry_token",
IMAGE_FILTER: "/data/attributes/secret/image_filter",
TAG_FILTER: "/data/attributes/secret/tag_filter",
CERTIFICATE_CONTENT: "/data/attributes/secret/certificate_content",
OCI_USER: "/data/attributes/secret/user",
OCI_FINGERPRINT: "/data/attributes/secret/fingerprint",
@@ -127,8 +111,6 @@ export const ErrorPointers = {
ALIBABACLOUD_ACCESS_KEY_SECRET: "/data/attributes/secret/access_key_secret",
ALIBABACLOUD_ROLE_ARN: "/data/attributes/secret/role_arn",
ALIBABACLOUD_ROLE_SESSION_NAME: "/data/attributes/secret/role_session_name",
OPENSTACK_CLOUDS_YAML_CONTENT: "/data/attributes/secret/clouds_yaml_content",
OPENSTACK_CLOUDS_YAML_CLOUD: "/data/attributes/secret/clouds_yaml_cloud",
} as const;
export type ErrorPointer = (typeof ErrorPointers)[keyof typeof ErrorPointers];

View File

@@ -304,15 +304,6 @@ export type IacCredentials = {
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type ImageCredentials = {
[ProviderCredentialFields.REGISTRY_USERNAME]?: string;
[ProviderCredentialFields.REGISTRY_PASSWORD]?: string;
[ProviderCredentialFields.REGISTRY_TOKEN]?: string;
[ProviderCredentialFields.IMAGE_FILTER]?: string;
[ProviderCredentialFields.TAG_FILTER]?: string;
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type OCICredentials = {
[ProviderCredentialFields.OCI_USER]: string;
[ProviderCredentialFields.OCI_FINGERPRINT]: string;
@@ -343,12 +334,6 @@ export type AlibabaCloudCredentialsRole = {
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type OpenStackCredentials = {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]: string;
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]: string;
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type CredentialsFormSchema =
| AWSCredentials
| AWSCredentialsRole
@@ -357,13 +342,11 @@ export type CredentialsFormSchema =
| GCPServiceAccountKey
| KubernetesCredentials
| IacCredentials
| ImageCredentials
| M365Credentials
| OCICredentials
| MongoDBAtlasCredentials
| AlibabaCloudCredentials
| AlibabaCloudCredentialsRole
| OpenStackCredentials;
| AlibabaCloudCredentialsRole;
export interface SearchParamsProps {
[key: string]: string | string[] | undefined;

View File

@@ -115,11 +115,6 @@ export const addProviderFormSchema = z
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("image"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("oraclecloud"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
@@ -135,11 +130,6 @@ export const addProviderFormSchema = z
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("openstack"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
]),
);
@@ -269,37 +259,7 @@ export const addCredentialsFormSchema = (
.string()
.min(1, "Access Key Secret is required"),
}
: providerType === "image"
? {
[ProviderCredentialFields.REGISTRY_USERNAME]: z
.string()
.optional(),
[ProviderCredentialFields.REGISTRY_PASSWORD]: z
.string()
.optional(),
[ProviderCredentialFields.REGISTRY_TOKEN]: z
.string()
.optional(),
[ProviderCredentialFields.IMAGE_FILTER]: z
.string()
.optional(),
[ProviderCredentialFields.TAG_FILTER]: z
.string()
.optional(),
}
: providerType === "openstack"
? {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]:
z
.string()
.min(
1,
"Clouds YAML content is required",
),
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]:
z.string().min(1, "Cloud name is required"),
}
: {}),
: {}),
})
.superRefine((data: Record<string, string | undefined>, ctx) => {
if (providerType === "m365") {

View File

@@ -7,10 +7,8 @@ export const PROVIDER_TYPES = [
"mongodbatlas",
"github",
"iac",
"image",
"oraclecloud",
"alibabacloud",
"openstack",
] as const;
export type ProviderType = (typeof PROVIDER_TYPES)[number];
@@ -24,10 +22,8 @@ export const PROVIDER_DISPLAY_NAMES: Record<ProviderType, string> = {
mongodbatlas: "MongoDB Atlas",
github: "GitHub",
iac: "Infrastructure as Code",
image: "Container Registry",
oraclecloud: "Oracle Cloud Infrastructure",
alibabacloud: "Alibaba Cloud",
openstack: "OpenStack",
};
export function getProviderDisplayName(providerId: string): string {