From c159181d2773396e24449e91d472d8fedc4b5e98 Mon Sep 17 00:00:00 2001
From: Andoni Alonso <14891798+andoniaf@users.noreply.github.com>
Date: Tue, 24 Feb 2026 13:06:34 +0100
Subject: [PATCH] feat(api): add Image provider support for container image
scanning (#10128)
---
api/CHANGELOG.md | 1 +
api/Dockerfile | 2 +-
.../api/migrations/0083_image_provider.py | 38 ++
api/src/backend/api/models.py | 10 +
api/src/backend/api/tests/test_serializers.py | 35 ++
api/src/backend/api/tests/test_utils.py | 160 +++++++
api/src/backend/api/utils.py | 49 ++-
api/src/backend/api/v1/serializers.py | 26 ++
api/src/backend/tasks/jobs/export.py | 1 +
poetry.lock | 4 +-
prowler/lib/check/checks_loader.py | 4 +-
prowler/lib/outputs/finding.py | 8 +-
prowler/lib/outputs/html/html.py | 50 +++
prowler/lib/scan/scan.py | 86 +++-
prowler/providers/common/provider.py | 13 -
prowler/providers/image/image_provider.py | 217 +++++++---
prowler/providers/image/lib/registry/base.py | 38 +-
.../image/lib/registry/dockerhub_adapter.py | 8 +-
tests/lib/outputs/html/html_test.py | 88 +++-
tests/providers/image/image_fixtures.py | 55 ++-
tests/providers/image/image_provider_test.py | 397 ++++++++++--------
.../lib/registry/test_dockerhub_adapter.py | 82 +++-
.../image/lib/registry/test_oci_adapter.py | 30 +-
.../lib/registry/test_provider_registry.py | 22 +-
24 files changed, 1120 insertions(+), 304 deletions(-)
create mode 100644 api/src/backend/api/migrations/0083_image_provider.py
diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md
index 63034c4352..dd239a00e6 100644
--- a/api/CHANGELOG.md
+++ b/api/CHANGELOG.md
@@ -9,6 +9,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Finding group summaries and resources endpoints for hierarchical findings views [(#9961)](https://github.com/prowler-cloud/prowler/pull/9961)
- OpenStack provider support [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003)
- PDF report for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)
+- `image` provider support for container image scanning [(#10128)](https://github.com/prowler-cloud/prowler/pull/10128)
### 🔄 Changed
diff --git a/api/Dockerfile b/api/Dockerfile
index a4d5d177cf..43abe7d82d 100644
--- a/api/Dockerfile
+++ b/api/Dockerfile
@@ -5,7 +5,7 @@ LABEL maintainer="https://github.com/prowler-cloud/api"
ARG POWERSHELL_VERSION=7.5.0
ENV POWERSHELL_VERSION=${POWERSHELL_VERSION}
-ARG TRIVY_VERSION=0.66.0
+ARG TRIVY_VERSION=0.69.1
ENV TRIVY_VERSION=${TRIVY_VERSION}
# hadolint ignore=DL3008
diff --git a/api/src/backend/api/migrations/0083_image_provider.py b/api/src/backend/api/migrations/0083_image_provider.py
new file mode 100644
index 0000000000..936fae2219
--- /dev/null
+++ b/api/src/backend/api/migrations/0083_image_provider.py
@@ -0,0 +1,38 @@
+from django.db import migrations
+
+import api.db_utils
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("api", "0082_backfill_finding_group_summaries"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="provider",
+ name="provider",
+ field=api.db_utils.ProviderEnumField(
+ choices=[
+ ("aws", "AWS"),
+ ("azure", "Azure"),
+ ("gcp", "GCP"),
+ ("kubernetes", "Kubernetes"),
+ ("m365", "M365"),
+ ("github", "GitHub"),
+ ("mongodbatlas", "MongoDB Atlas"),
+ ("iac", "IaC"),
+ ("oraclecloud", "Oracle Cloud Infrastructure"),
+ ("alibabacloud", "Alibaba Cloud"),
+ ("cloudflare", "Cloudflare"),
+ ("openstack", "OpenStack"),
+ ("image", "Image"),
+ ],
+ default="aws",
+ ),
+ ),
+ migrations.RunSQL(
+ "ALTER TYPE provider ADD VALUE IF NOT EXISTS 'image';",
+ reverse_sql=migrations.RunSQL.noop,
+ ),
+ ]
diff --git a/api/src/backend/api/models.py b/api/src/backend/api/models.py
index 882abeb7b2..1f82c4a7da 100644
--- a/api/src/backend/api/models.py
+++ b/api/src/backend/api/models.py
@@ -292,6 +292,7 @@ class Provider(RowLevelSecurityProtectedModel):
ALIBABACLOUD = "alibabacloud", _("Alibaba Cloud")
CLOUDFLARE = "cloudflare", _("Cloudflare")
OPENSTACK = "openstack", _("OpenStack")
+ IMAGE = "image", _("Image")
@staticmethod
def validate_aws_uid(value):
@@ -426,6 +427,15 @@ class Provider(RowLevelSecurityProtectedModel):
pointer="/data/attributes/uid",
)
+ @staticmethod
+ def validate_image_uid(value):
+ if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9._/:@-]{2,249}$", value):
+ raise ModelValidationError(
+ detail="Image provider ID must be a valid container image reference.",
+ code="image-uid",
+ pointer="/data/attributes/uid",
+ )
+
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
inserted_at = models.DateTimeField(auto_now_add=True, editable=False)
updated_at = models.DateTimeField(auto_now=True, editable=False)
diff --git a/api/src/backend/api/tests/test_serializers.py b/api/src/backend/api/tests/test_serializers.py
index a52b3464d8..5810a97b63 100644
--- a/api/src/backend/api/tests/test_serializers.py
+++ b/api/src/backend/api/tests/test_serializers.py
@@ -2,6 +2,7 @@ import pytest
from rest_framework.exceptions import ValidationError
from api.v1.serializer_utils.integrations import S3ConfigSerializer
+from api.v1.serializers import ImageProviderSecret
class TestS3ConfigSerializer:
@@ -98,3 +99,37 @@ class TestS3ConfigSerializer:
serializer = S3ConfigSerializer(data=data)
assert not serializer.is_valid()
assert "output_directory" in serializer.errors
+
+
+class TestImageProviderSecret:
+ """Test cases for ImageProviderSecret validation."""
+
+ def test_valid_no_credentials(self):
+ serializer = ImageProviderSecret(data={})
+ assert serializer.is_valid()
+
+ def test_valid_token_only(self):
+ serializer = ImageProviderSecret(data={"registry_token": "tok"})
+ assert serializer.is_valid()
+
+ def test_valid_username_and_password(self):
+ serializer = ImageProviderSecret(
+ data={"registry_username": "user", "registry_password": "pass"}
+ )
+ assert serializer.is_valid()
+
+ def test_valid_token_with_username_only(self):
+ serializer = ImageProviderSecret(
+ data={"registry_token": "tok", "registry_username": "user"}
+ )
+ assert serializer.is_valid()
+
+ def test_invalid_username_without_password(self):
+ serializer = ImageProviderSecret(data={"registry_username": "user"})
+ assert not serializer.is_valid()
+ assert "non_field_errors" in serializer.errors
+
+ def test_invalid_password_without_username(self):
+ serializer = ImageProviderSecret(data={"registry_password": "pass"})
+ assert not serializer.is_valid()
+ assert "non_field_errors" in serializer.errors
diff --git a/api/src/backend/api/tests/test_utils.py b/api/src/backend/api/tests/test_utils.py
index 236cbf87a2..24c051717d 100644
--- a/api/src/backend/api/tests/test_utils.py
+++ b/api/src/backend/api/tests/test_utils.py
@@ -24,6 +24,7 @@ from prowler.providers.cloudflare.cloudflare_provider import CloudflareProvider
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
+from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
@@ -122,6 +123,7 @@ class TestReturnProwlerProvider:
(Provider.ProviderChoices.ALIBABACLOUD.value, AlibabacloudProvider),
(Provider.ProviderChoices.CLOUDFLARE.value, CloudflareProvider),
(Provider.ProviderChoices.OPENSTACK.value, OpenstackProvider),
+ (Provider.ProviderChoices.IMAGE.value, ImageProvider),
],
)
def test_return_prowler_provider(self, provider_type, expected_provider):
@@ -188,6 +190,47 @@ class TestProwlerProviderConnectionTest:
assert isinstance(connection.error, Provider.secret.RelatedObjectDoesNotExist)
assert str(connection.error) == "Provider has no secret."
+ @patch("api.utils.return_prowler_provider")
+ def test_prowler_provider_connection_test_image_provider(
+ self, mock_return_prowler_provider
+ ):
+ """Test connection test for Image provider with credentials."""
+ provider = MagicMock()
+ provider.uid = "docker.io/myns/myimage:latest"
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret.secret = {
+ "registry_username": "user",
+ "registry_password": "pass",
+ "registry_token": "tok123",
+ }
+ mock_return_prowler_provider.return_value = MagicMock()
+
+ prowler_provider_connection_test(provider)
+ mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
+ image="docker.io/myns/myimage:latest",
+ raise_on_exception=False,
+ registry_username="user",
+ registry_password="pass",
+ registry_token="tok123",
+ )
+
+ @patch("api.utils.return_prowler_provider")
+ def test_prowler_provider_connection_test_image_provider_no_creds(
+ self, mock_return_prowler_provider
+ ):
+ """Test connection test for Image provider without credentials."""
+ provider = MagicMock()
+ provider.uid = "alpine:3.18"
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret.secret = {}
+ mock_return_prowler_provider.return_value = MagicMock()
+
+ prowler_provider_connection_test(provider)
+ mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
+ image="alpine:3.18",
+ raise_on_exception=False,
+ )
+
class TestGetProwlerProviderKwargs:
@pytest.mark.parametrize(
@@ -336,6 +379,123 @@ class TestGetProwlerProviderKwargs:
}
assert result == expected_result
+ def test_get_prowler_provider_kwargs_image_provider_registry_url(self):
+ """Test that Image provider with a registry URL gets 'registry' kwarg."""
+ provider_uid = "docker.io/myns"
+ secret_dict = {
+ "registry_username": "user",
+ "registry_password": "pass",
+ }
+ secret_mock = MagicMock()
+ secret_mock.secret = secret_dict
+
+ provider = MagicMock()
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret = secret_mock
+ provider.uid = provider_uid
+
+ result = get_prowler_provider_kwargs(provider)
+
+ expected_result = {
+ "registry": provider_uid,
+ "registry_username": "user",
+ "registry_password": "pass",
+ }
+ assert result == expected_result
+
+ def test_get_prowler_provider_kwargs_image_provider_image_ref(self):
+ """Test that Image provider with a full image reference gets 'images' kwarg."""
+ provider_uid = "docker.io/myns/myimage:latest"
+ secret_dict = {
+ "registry_username": "user",
+ "registry_password": "pass",
+ }
+ secret_mock = MagicMock()
+ secret_mock.secret = secret_dict
+
+ provider = MagicMock()
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret = secret_mock
+ provider.uid = provider_uid
+
+ result = get_prowler_provider_kwargs(provider)
+
+ expected_result = {
+ "images": [provider_uid],
+ "registry_username": "user",
+ "registry_password": "pass",
+ }
+ assert result == expected_result
+
+ def test_get_prowler_provider_kwargs_image_provider_dockerhub_image(self):
+ """Test that Image provider with a short DockerHub image gets 'images' kwarg."""
+ provider_uid = "alpine:3.18"
+ secret_dict = {}
+ secret_mock = MagicMock()
+ secret_mock.secret = secret_dict
+
+ provider = MagicMock()
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret = secret_mock
+ provider.uid = provider_uid
+
+ result = get_prowler_provider_kwargs(provider)
+
+ expected_result = {"images": [provider_uid]}
+ assert result == expected_result
+
+ def test_get_prowler_provider_kwargs_image_provider_filters_falsy_secrets(self):
+ """Test that falsy secret values are filtered out for Image provider."""
+ provider_uid = "docker.io/myns/myimage:latest"
+ secret_dict = {
+ "registry_username": "",
+ "registry_password": "",
+ }
+ secret_mock = MagicMock()
+ secret_mock.secret = secret_dict
+
+ provider = MagicMock()
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret = secret_mock
+ provider.uid = provider_uid
+
+ result = get_prowler_provider_kwargs(provider)
+
+ expected_result = {"images": [provider_uid]}
+ assert result == expected_result
+
+ def test_get_prowler_provider_kwargs_image_provider_ignores_mutelist(self):
+ """Test that Image provider does NOT receive mutelist_content.
+
+ Image provider uses Trivy's built-in mutelist logic, so it should not
+ receive mutelist_content even when a mutelist processor is configured.
+ """
+ provider_uid = "docker.io/myns/myimage:latest"
+ secret_dict = {
+ "registry_username": "user",
+ "registry_password": "pass",
+ }
+ secret_mock = MagicMock()
+ secret_mock.secret = secret_dict
+
+ mutelist_processor = MagicMock()
+ mutelist_processor.configuration = {"Mutelist": {"key": "value"}}
+
+ provider = MagicMock()
+ provider.provider = Provider.ProviderChoices.IMAGE.value
+ provider.secret = secret_mock
+ provider.uid = provider_uid
+
+ result = get_prowler_provider_kwargs(provider, mutelist_processor)
+
+ assert "mutelist_content" not in result
+ expected_result = {
+ "images": [provider_uid],
+ "registry_username": "user",
+ "registry_password": "pass",
+ }
+ assert result == expected_result
+
def test_get_prowler_provider_kwargs_unsupported_provider(self):
# Setup
provider_uid = "provider_uid"
diff --git a/api/src/backend/api/utils.py b/api/src/backend/api/utils.py
index 6d8cb3f99b..d856bd5e6a 100644
--- a/api/src/backend/api/utils.py
+++ b/api/src/backend/api/utils.py
@@ -28,6 +28,7 @@ if TYPE_CHECKING:
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
+ from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import (
@@ -83,6 +84,7 @@ def return_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
+ | ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -95,7 +97,7 @@ def return_prowler_provider(
provider (Provider): The provider object containing the provider type and associated secrets.
Returns:
- AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
+ AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
Raises:
ValueError: If the provider type specified in `provider.provider` is not supported.
@@ -159,6 +161,10 @@ def return_prowler_provider(
from prowler.providers.openstack.openstack_provider import OpenstackProvider
prowler_provider = OpenstackProvider
+ case Provider.ProviderChoices.IMAGE.value:
+ from prowler.providers.image.image_provider import ImageProvider
+
+ prowler_provider = ImageProvider
case _:
raise ValueError(f"Provider type {provider.provider} not supported")
return prowler_provider
@@ -219,11 +225,29 @@ def get_prowler_provider_kwargs(
# clouds_yaml_content, clouds_yaml_cloud and provider_id are validated
# in the provider itself, so it's not needed here.
pass
+ elif provider.provider == Provider.ProviderChoices.IMAGE.value:
+ # Detect whether uid is a registry URL (e.g. "docker.io/andoniaf") or
+ # a concrete image reference (e.g. "docker.io/andoniaf/myimage:latest").
+ from prowler.providers.image.image_provider import ImageProvider
+
+ if ImageProvider._is_registry_url(provider.uid):
+ prowler_provider_kwargs = {
+ "registry": provider.uid,
+ **{k: v for k, v in prowler_provider_kwargs.items() if v},
+ }
+ else:
+ prowler_provider_kwargs = {
+ "images": [provider.uid],
+ **{k: v for k, v in prowler_provider_kwargs.items() if v},
+ }
if mutelist_processor:
mutelist_content = mutelist_processor.configuration.get("Mutelist", {})
- # IaC provider doesn't support mutelist (uses Trivy's built-in logic)
- if mutelist_content and provider.provider != Provider.ProviderChoices.IAC.value:
+ # IaC and Image providers don't support mutelist (both use Trivy's built-in logic)
+ if mutelist_content and provider.provider not in (
+ Provider.ProviderChoices.IAC.value,
+ Provider.ProviderChoices.IMAGE.value,
+ ):
prowler_provider_kwargs["mutelist_content"] = mutelist_content
return prowler_provider_kwargs
@@ -240,6 +264,7 @@ def initialize_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
+ | ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -253,7 +278,7 @@ def initialize_prowler_provider(
mutelist_processor (Processor): The mutelist processor object containing the mutelist configuration.
Returns:
- AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
+ AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
initialized with the provider's secrets.
"""
prowler_provider = return_prowler_provider(provider)
@@ -296,6 +321,22 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
"raise_on_exception": False,
}
return prowler_provider.test_connection(**openstack_kwargs)
+ elif provider.provider == Provider.ProviderChoices.IMAGE.value:
+ image_kwargs = {
+ "image": provider.uid,
+ "raise_on_exception": False,
+ }
+ if prowler_provider_kwargs.get("registry_username"):
+ image_kwargs["registry_username"] = prowler_provider_kwargs[
+ "registry_username"
+ ]
+ if prowler_provider_kwargs.get("registry_password"):
+ image_kwargs["registry_password"] = prowler_provider_kwargs[
+ "registry_password"
+ ]
+ if prowler_provider_kwargs.get("registry_token"):
+ image_kwargs["registry_token"] = prowler_provider_kwargs["registry_token"]
+ return prowler_provider.test_connection(**image_kwargs)
else:
return prowler_provider.test_connection(
**prowler_provider_kwargs,
diff --git a/api/src/backend/api/v1/serializers.py b/api/src/backend/api/v1/serializers.py
index 18619a2384..babc1d95bc 100644
--- a/api/src/backend/api/v1/serializers.py
+++ b/api/src/backend/api/v1/serializers.py
@@ -1528,6 +1528,8 @@ class BaseWriteProviderSecretSerializer(BaseWriteSerializer):
)
elif provider_type == Provider.ProviderChoices.OPENSTACK.value:
serializer = OpenStackCloudsYamlProviderSecret(data=secret)
+ elif provider_type == Provider.ProviderChoices.IMAGE.value:
+ serializer = ImageProviderSecret(data=secret)
else:
raise serializers.ValidationError(
{"provider": f"Provider type not supported {provider_type}"}
@@ -1702,6 +1704,30 @@ class OpenStackCloudsYamlProviderSecret(serializers.Serializer):
resource_name = "provider-secrets"
+class ImageProviderSecret(serializers.Serializer):
+ registry_username = serializers.CharField(required=False)
+ registry_password = serializers.CharField(required=False)
+ registry_token = serializers.CharField(required=False)
+
+ class Meta:
+ resource_name = "provider-secrets"
+
+ def validate(self, attrs):
+ token = attrs.get("registry_token")
+ username = attrs.get("registry_username")
+ password = attrs.get("registry_password")
+ if not token:
+ if username and not password:
+ raise serializers.ValidationError(
+ "registry_password is required when registry_username is provided."
+ )
+ if password and not username:
+ raise serializers.ValidationError(
+ "registry_username is required when registry_password is provided."
+ )
+ return attrs
+
+
class AlibabaCloudProviderSecret(serializers.Serializer):
access_key_id = serializers.CharField()
access_key_secret = serializers.CharField()
diff --git a/api/src/backend/tasks/jobs/export.py b/api/src/backend/tasks/jobs/export.py
index c2a947d249..4b8498f7e7 100644
--- a/api/src/backend/tasks/jobs/export.py
+++ b/api/src/backend/tasks/jobs/export.py
@@ -137,6 +137,7 @@ COMPLIANCE_CLASS_MAP = {
# IaC provider doesn't have specific compliance frameworks yet
# Trivy handles its own compliance checks
],
+ "image": [],
"oraclecloud": [
(lambda name: name.startswith("cis_"), OracleCloudCIS),
(lambda name: name.startswith("csa_"), OracleCloudCSA),
diff --git a/poetry.lock b/poetry.lock
index 092d6351fe..abc498d59f 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1,4 +1,4 @@
-# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
+# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -4874,7 +4874,7 @@ description = "C parser in Python"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
-markers = "implementation_name != \"PyPy\" and platform_python_implementation != \"PyPy\""
+markers = "platform_python_implementation != \"PyPy\" and implementation_name != \"PyPy\""
files = [
{file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"},
{file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"},
diff --git a/prowler/lib/check/checks_loader.py b/prowler/lib/check/checks_loader.py
index dab5535d77..bc1ea937be 100644
--- a/prowler/lib/check/checks_loader.py
+++ b/prowler/lib/check/checks_loader.py
@@ -22,8 +22,8 @@ def load_checks_to_execute(
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
- # Bypass check loading for IAC provider since it uses Trivy directly
- if provider == "iac":
+ # Bypass check loading for providers that use Trivy directly
+ if provider in ("iac", "image"):
return set()
# Local subsets
diff --git a/prowler/lib/outputs/finding.py b/prowler/lib/outputs/finding.py
index 82f64850f8..2c2d5a61f2 100644
--- a/prowler/lib/outputs/finding.py
+++ b/prowler/lib/outputs/finding.py
@@ -384,10 +384,12 @@ class Finding(BaseModel):
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = "image"
output_data["account_name"] = "image"
- output_data["resource_name"] = getattr(
- check_output, "resource_name", ""
+ image_name = getattr(check_output, "resource_name", "")
+ image_sha = getattr(check_output, "image_sha", "")
+ output_data["resource_name"] = image_name
+ output_data["resource_uid"] = (
+ f"{image_name}:{image_sha}" if image_sha else image_name
)
- output_data["resource_uid"] = getattr(check_output, "resource_id", "")
output_data["region"] = getattr(check_output, "region", "container")
output_data["package_name"] = getattr(check_output, "package_name", "")
output_data["installed_version"] = getattr(
diff --git a/prowler/lib/outputs/html/html.py b/prowler/lib/outputs/html/html.py
index 779fa2d776..26ee798021 100644
--- a/prowler/lib/outputs/html/html.py
+++ b/prowler/lib/outputs/html/html.py
@@ -930,6 +930,56 @@ class HTML(Output):
)
return ""
+ @staticmethod
+ def get_image_assessment_summary(provider: Provider) -> str:
+ """
+ get_image_assessment_summary gets the HTML assessment summary for the Image provider
+
+ Args:
+ provider (Provider): the Image provider object
+
+ Returns:
+ str: the HTML assessment summary
+ """
+ try:
+ if provider.registry:
+ target_info = f"Registry URL: {provider.registry}"
+ else:
+ target_info = f'Images: {", ".join(provider.images)}'
+
+ return f"""
+
+
+
+
+
+ -
+ Image authentication method: {provider.auth_method}
+
+
+
+
"""
+ except Exception as error:
+ logger.error(
+ f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
+ )
+ return ""
+
@staticmethod
def get_llm_assessment_summary(provider: Provider) -> str:
"""
diff --git a/prowler/lib/scan/scan.py b/prowler/lib/scan/scan.py
index 6af2d26fbc..0b38f16b1b 100644
--- a/prowler/lib/scan/scan.py
+++ b/prowler/lib/scan/scan.py
@@ -27,6 +27,7 @@ from prowler.lib.scan.exceptions.exceptions import (
from prowler.providers.common.models import Audit_Metadata, ProviderOutputOptions
from prowler.providers.common.provider import Provider
from prowler.providers.iac.iac_provider import IacProvider
+from prowler.providers.image.image_provider import ImageProvider
class Scan:
@@ -92,10 +93,10 @@ class Scan:
except ValueError:
raise ScanInvalidStatusError(f"Invalid status provided: {s}.")
- # Special setup for IaC provider - override inputs to work with traditional flow
- if provider.type == "iac":
- # IaC doesn't use traditional Prowler checks, so clear all input parameters
- # to avoid validation errors and let it flow through the normal logic
+ # Special setup for IaC/Image providers - override inputs to work with traditional flow
+ if provider.type in ("iac", "image"):
+ # These providers don't use traditional Prowler checks, so clear all input parameters
+ # to avoid validation errors and let them flow through the normal logic
checks = None
services = None
excluded_checks = None
@@ -160,8 +161,8 @@ class Scan:
)
# Load checks to execute
- if provider.type == "iac":
- self._checks_to_execute = ["iac_scan"] # Dummy check name for IaC
+ if provider.type in ("iac", "image"):
+ self._checks_to_execute = [f"{provider.type}_scan"]
else:
self._checks_to_execute = sorted(
load_checks_to_execute(
@@ -200,8 +201,8 @@ class Scan:
self._number_of_checks_to_execute = len(self._checks_to_execute)
# Set up service-based checks tracking
- if provider.type == "iac":
- service_checks_to_execute = {"iac": set(["iac_scan"])}
+ if provider.type in ("iac", "image"):
+ service_checks_to_execute = {provider.type: set([f"{provider.type}_scan"])}
else:
service_checks_to_execute = get_service_checks_to_execute(
self._checks_to_execute
@@ -346,6 +347,75 @@ class Scan:
self._duration = int((end_time - start_time).total_seconds())
return
+ # Special handling for Image provider
+ elif self._provider.type == "image":
+ if isinstance(self._provider, ImageProvider):
+ logger.info("Running Image scan with Trivy...")
+
+ total_images = len(self._provider.images)
+ images_completed = 0
+
+ for image_name, image_findings in self._provider.scan_per_image():
+ findings = []
+
+ for report in image_findings:
+ finding_uid = f"{report.check_metadata.CheckID}-{report.resource_name}-{report.resource_id}"
+
+ status_enum = (
+ Status.FAIL if report.status == "FAIL" else Status.PASS
+ )
+ if report.muted:
+ status_enum = Status.MUTED
+
+ image_sha = getattr(report, "image_sha", "")
+ resource_uid = (
+ f"{image_name}:{image_sha}" if image_sha else image_name
+ )
+
+ finding = Finding(
+ auth_method="Registry",
+ timestamp=datetime.datetime.now(timezone.utc),
+ account_uid=getattr(self._provider, "registry", None)
+ or "image",
+ account_name="Container Registry",
+ metadata=report.check_metadata,
+ uid=finding_uid,
+ status=status_enum,
+ status_extended=report.status_extended,
+ muted=report.muted,
+ resource_uid=resource_uid,
+ resource_metadata=report.resource,
+ resource_name=image_name,
+ resource_details=report.resource_details,
+ resource_tags={},
+ region=report.region,
+ compliance={},
+ raw=report.resource,
+ )
+ findings.append(finding)
+
+ # Filter the findings by the status
+ if self._status:
+ findings = [f for f in findings if f.status in self._status]
+
+ images_completed += 1
+ progress = (
+ images_completed / total_images * 100
+ if total_images > 0
+ else 100.0
+ )
+
+ yield (progress, findings)
+
+ # Update progress
+ self._number_of_checks_completed = 1
+ self._number_of_checks_to_execute = 1
+
+ # Calculate duration
+ end_time = datetime.datetime.now()
+ self._duration = int((end_time - start_time).total_seconds())
+ return
+
for check_name in checks_to_execute:
try:
# Recover service from check name
diff --git a/prowler/providers/common/provider.py b/prowler/providers/common/provider.py
index e43fa71c7d..6f29c0b800 100644
--- a/prowler/providers/common/provider.py
+++ b/prowler/providers/common/provider.py
@@ -285,19 +285,6 @@ class Provider(ABC):
timeout=arguments.timeout,
config_path=arguments.config_file,
fixer_config=fixer_config,
- registry_username=getattr(arguments, "registry_username", None),
- registry_password=getattr(arguments, "registry_password", None),
- registry_token=getattr(arguments, "registry_token", None),
- registry=getattr(arguments, "registry", None),
- image_filter=getattr(arguments, "image_filter", None),
- tag_filter=getattr(arguments, "tag_filter", None),
- max_images=getattr(arguments, "max_images", 0),
- registry_insecure=getattr(
- arguments, "registry_insecure", False
- ),
- registry_list_images=getattr(
- arguments, "registry_list_images", False
- ),
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(
diff --git a/prowler/providers/image/image_provider.py b/prowler/providers/image/image_provider.py
index 15f4e14093..9817656dd2 100644
--- a/prowler/providers/image/image_provider.py
+++ b/prowler/providers/image/image_provider.py
@@ -31,6 +31,9 @@ from prowler.providers.image.exceptions.exceptions import (
ImageListFileReadError,
ImageMaxImagesExceededError,
ImageNoImagesProvidedError,
+ ImageRegistryAuthError,
+ ImageRegistryCatalogError,
+ ImageRegistryNetworkError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
@@ -96,6 +99,7 @@ class ImageProvider(Provider):
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
+ self._listing_only = False
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
self.registry_username = registry_username or os.environ.get(
@@ -107,8 +111,8 @@ class ImageProvider(Provider):
self.registry_token = registry_token or os.environ.get("REGISTRY_TOKEN")
if self.registry_username and self.registry_password:
- self._auth_method = "Basic auth"
- logger.info("Using basic auth for registry authentication")
+ self._auth_method = "Docker login"
+ logger.info("Using docker login for registry authentication")
elif self.registry_token:
self._auth_method = "Registry token"
logger.info("Using registry token for authentication")
@@ -152,6 +156,8 @@ class ImageProvider(Provider):
# Registry scan mode: enumerate images from registry
if self.registry:
self._enumerate_registry()
+ if self._listing_only:
+ return
for image in self.images:
self._validate_image_name(image)
@@ -319,40 +325,61 @@ class ImageProvider(Provider):
return parts[0]
return None
+ @staticmethod
+ def _is_registry_url(image_uid: str) -> bool:
+ """Determine whether an image UID is a registry URL (namespace only).
+
+ A registry URL like ``docker.io/andoniaf`` has a registry host but
+ the remaining part contains no ``/`` (no repo) and no ``:`` (no tag).
+ """
+ registry_host = ImageProvider._extract_registry(image_uid)
+ if not registry_host:
+ return False
+ repo_and_tag = image_uid[len(registry_host) + 1 :]
+ return "/" not in repo_and_tag and ":" not in repo_and_tag
+
def cleanup(self) -> None:
"""Clean up any resources after scanning."""
def _process_finding(
- self, finding: dict, image_name: str, finding_type: str
+ self,
+ finding: dict,
+ image: str,
+ trivy_target: str,
+ image_sha: str = "",
) -> CheckReportImage:
"""
Process a single finding and create a CheckReportImage object.
Args:
finding: The finding object from Trivy output
- image_name: The container image name being scanned
- finding_type: The type of finding (Vulnerability, Secret, etc.)
+ image: The clean container image name (e.g., "alpine:3.18")
+ trivy_target: The Trivy target string (e.g., "alpine:3.18 (alpine 3.18.0)")
+ image_sha: Short SHA from Trivy Metadata.ImageID for resource uniqueness
Returns:
CheckReportImage: The processed check report
"""
try:
- # Determine finding ID based on type
+ # Determine finding ID and category based on type
if "VulnerabilityID" in finding:
finding_id = finding["VulnerabilityID"]
finding_description = finding.get(
"Description", finding.get("Title", "")
)
finding_status = "FAIL"
+ finding_categories = ["vulnerability"]
elif "RuleID" in finding:
# Secret finding
finding_id = finding["RuleID"]
finding_description = finding.get("Title", "Secret detected")
finding_status = "FAIL"
+ finding_categories = ["secrets"]
else:
finding_id = finding.get("ID", "UNKNOWN")
finding_description = finding.get("Description", "")
finding_status = finding.get("Status", "FAIL")
+ finding_categories = []
# Build remediation text for vulnerabilities
remediation_text = ""
@@ -371,7 +398,7 @@ class ImageProvider(Provider):
"CheckID": finding_id,
"CheckTitle": finding.get("Title", finding_id),
"CheckType": ["Container Image Security"],
- "ServiceName": finding_type,
+ "ServiceName": "container-image",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": trivy_severity,
@@ -381,7 +408,7 @@ class ImageProvider(Provider):
"Risk": finding.get(
"Description", "Vulnerability detected in container image"
),
- "RelatedUrl": finding.get("PrimaryURL", ""),
+ "RelatedUrl": "",
"Remediation": {
"Code": {
"NativeIaC": "",
@@ -394,7 +421,7 @@ class ImageProvider(Provider):
"Url": finding.get("PrimaryURL", ""),
},
},
- "Categories": [],
+ "Categories": finding_categories,
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
@@ -404,11 +431,13 @@ class ImageProvider(Provider):
metadata = json.dumps(metadata_dict)
report = CheckReportImage(
- metadata=metadata, finding=finding, image_name=image_name
+ metadata=metadata, finding=finding, image_name=image
)
report.status = finding_status
report.status_extended = self._build_status_extended(finding)
report.region = self.region
+ report.image_sha = image_sha
+ report.resource_details = trivy_target
return report
except Exception as error:
@@ -453,6 +482,29 @@ class ImageProvider(Provider):
finally:
self.cleanup()
+ def scan_per_image(
+ self,
+ ) -> Generator[tuple[str, list[CheckReportImage]], None, None]:
+ """Scan images one by one, yielding (image_name, findings) per image.
+
+ Unlike run() which returns all findings at once, this method yields
+ after each image completes, enabling progress tracking.
+ """
+ try:
+ for image in self.images:
+ try:
+ image_findings = []
+ for batch in self._scan_single_image(image):
+ image_findings.extend(batch)
+ yield (image, image_findings)
+ except (ImageScanError, ImageTrivyBinaryNotFoundError):
+ raise
+ except Exception as error:
+ logger.error(f"Error scanning image {image}: {error}")
+ yield (image, [])
+ finally:
+ self.cleanup()
+
def run_scan(self) -> Generator[list[CheckReportImage], None, None]:
"""
Run Trivy scan on all configured images.
@@ -534,6 +586,19 @@ class ImageProvider(Provider):
logger.info(f"No findings for image: {image}")
return
+ # Extract image digest for resource uniqueness
+ trivy_metadata = output.get("Metadata", {})
+ image_id = trivy_metadata.get("ImageID", "")
+ if not image_id:
+ repo_digests = trivy_metadata.get("RepoDigests", [])
+ if repo_digests:
+ image_id = (
+ repo_digests[0].split("@")[-1]
+ if "@" in repo_digests[0]
+ else ""
+ )
+ short_sha = image_id.replace("sha256:", "")[:12] if image_id else ""
+
except json.JSONDecodeError as error:
logger.error(f"Failed to parse Trivy output for {image}: {error}")
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
@@ -544,11 +609,12 @@ class ImageProvider(Provider):
for result in results:
target = result.get("Target", image)
- result_type = result.get("Type", "unknown")
# Process Vulnerabilities
for vuln in result.get("Vulnerabilities", []):
- report = self._process_finding(vuln, target, result_type)
+ report = self._process_finding(
+ vuln, image, target, image_sha=short_sha
+ )
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -556,7 +622,9 @@ class ImageProvider(Provider):
# Process Secrets
for secret in result.get("Secrets", []):
- report = self._process_finding(secret, target, "secret")
+ report = self._process_finding(
+ secret, image, target, image_sha=short_sha
+ )
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -565,7 +633,7 @@ class ImageProvider(Provider):
# Process Misconfigurations (from Dockerfile)
for misconfig in result.get("Misconfigurations", []):
report = self._process_finding(
- misconfig, target, "misconfiguration"
+ misconfig, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
@@ -679,7 +747,7 @@ class ImageProvider(Provider):
lower = error_msg.lower()
if any(kw in lower for kw in ("401", "403", "unauthorized", "denied")):
- return f"Auth failure — check registry credentials: {error_msg}"
+ return f"Auth failure — check `docker login`: {error_msg}"
if any(kw in lower for kw in ("404", "manifest unknown", "not found")):
return f"Image not found — check name/tag/registry: {error_msg}"
if any(kw in lower for kw in ("429", "rate limit", "too many requests")):
@@ -747,10 +815,11 @@ class ImageProvider(Provider):
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
- # Registry list mode: print listing and exit
+ # Registry list mode: print listing and return early
if self.registry_list_images:
self._print_registry_listing(repos_tags, len(discovered_images))
- raise SystemExit(0)
+ self._listing_only = True
+ return
# Check max-images limit
if self.max_images and len(discovered_images) > self.max_images:
@@ -848,10 +917,19 @@ class ImageProvider(Provider):
registry_token: str | None = None,
) -> "Connection":
"""
- Test connection to container registry by attempting to inspect an image.
+ Test connection to container registry by verifying image accessibility.
+
+ Handles two cases:
+ - Image reference (e.g. ``alpine:3.18``, ``ghcr.io/user/repo:tag``):
+ verifies the specific tag exists.
+ - Registry URL (e.g. ``docker.io/namespace``, ``ghcr.io/org``):
+ verifies we can list repositories in that namespace.
+
+ Uses registry HTTP APIs directly instead of Trivy to avoid false
+ failures caused by Trivy DB download issues.
Args:
- image: Container image to test
+ image: Container image or registry URL to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
registry_username: Registry username for basic auth
@@ -868,58 +946,65 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
- # Build env with registry credentials
- env = dict(os.environ)
- if registry_username and registry_password:
- env["TRIVY_USERNAME"] = registry_username
- env["TRIVY_PASSWORD"] = registry_password
- elif registry_token:
- env["TRIVY_REGISTRY_TOKEN"] = registry_token
-
- # Test by running trivy with --skip-update to just test image access
- process = subprocess.run(
- [
- "trivy",
- "image",
- "--skip-db-update",
- "--download-db-only=false",
- image,
- ],
- capture_output=True,
- text=True,
- timeout=60,
- env=env,
- )
-
- if process.returncode == 0:
+ if ImageProvider._is_registry_url(image):
+ # Registry enumeration mode — test by listing repositories
+ adapter = create_registry_adapter(
+ registry_url=image,
+ username=registry_username,
+ password=registry_password,
+ token=registry_token,
+ )
+ adapter.list_repositories()
return Connection(is_connected=True)
- else:
- error_msg = process.stderr or "Unknown error"
- if "401" in error_msg or "unauthorized" in error_msg.lower():
- return Connection(
- is_connected=False,
- error="Authentication failed. Check registry credentials.",
- )
- elif "not found" in error_msg.lower() or "404" in error_msg:
- return Connection(
- is_connected=False,
- error="Image not found in registry.",
- )
- else:
- return Connection(
- is_connected=False,
- error=f"Failed to access image: {error_msg[:200]}",
- )
- except subprocess.TimeoutExpired:
- return Connection(
- is_connected=False,
- error="Connection timed out",
+ # Image reference mode — verify the specific tag exists
+ registry_host = ImageProvider._extract_registry(image)
+ repo_and_tag = image[len(registry_host) + 1 :] if registry_host else image
+ if ":" in repo_and_tag:
+ repository, tag = repo_and_tag.rsplit(":", 1)
+ else:
+ repository = repo_and_tag
+ tag = "latest"
+
+ is_dockerhub = not registry_host or registry_host in (
+ "docker.io",
+ "registry-1.docker.io",
)
- except FileNotFoundError:
+
+ # Docker Hub official images use "library/" prefix
+ if is_dockerhub and "/" not in repository:
+ repository = f"library/{repository}"
+
+ if is_dockerhub:
+ registry_url = f"docker.io/{repository.split('/')[0]}"
+ else:
+ registry_url = registry_host
+
+ adapter = create_registry_adapter(
+ registry_url=registry_url,
+ username=registry_username,
+ password=registry_password,
+ token=registry_token,
+ )
+
+ tags = adapter.list_tags(repository)
+ if tag not in tags:
+ return Connection(
+ is_connected=False,
+ error=f"Tag '{tag}' not found for image '{image}'.",
+ )
+
+ return Connection(is_connected=True)
+
+ except ImageRegistryAuthError:
return Connection(
is_connected=False,
- error="Trivy binary not found. Please install Trivy.",
+ error="Authentication failed. Check registry credentials.",
+ )
+ except (ImageRegistryNetworkError, ImageRegistryCatalogError) as exc:
+ return Connection(
+ is_connected=False,
+ error=f"Failed to access image: {str(exc)[:200]}",
)
except Exception as error:
if raise_on_exception:
diff --git a/prowler/providers/image/lib/registry/base.py b/prowler/providers/image/lib/registry/base.py
index 2e07fcde93..1bb26ccbfb 100644
--- a/prowler/providers/image/lib/registry/base.py
+++ b/prowler/providers/image/lib/registry/base.py
@@ -9,11 +9,13 @@ from urllib.parse import urlparse
import requests
+from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
_MAX_RETRIES = 3
_BACKOFF_BASE = 1
+_USER_AGENT = f"Prowler/{prowler_version} (registry-adapter)"
class RegistryAdapter(ABC):
@@ -70,8 +72,12 @@ class RegistryAdapter(ABC):
context_label = kwargs.pop("context_label", None) or self.registry_url
kwargs.setdefault("timeout", 30)
kwargs.setdefault("verify", self.verify_ssl)
+ headers = kwargs.get("headers", {})
+ headers.setdefault("User-Agent", _USER_AGENT)
+ kwargs["headers"] = headers
last_exception = None
last_status = None
+ last_body = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
resp = requests.request(method, url, **kwargs)
@@ -83,6 +89,16 @@ class RegistryAdapter(ABC):
)
time.sleep(wait)
continue
+ if resp.status_code >= 500:
+ last_status = resp.status_code
+ last_body = (resp.text or "")[:500]
+ wait = _BACKOFF_BASE * (2 ** (attempt - 1))
+ logger.warning(
+ f"Server error from {context_label} (HTTP {resp.status_code}), "
+ f"retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES}): {last_body}"
+ )
+ time.sleep(wait)
+ continue
return resp
except requests.exceptions.ConnectionError as exc:
last_exception = exc
@@ -104,21 +120,27 @@ class RegistryAdapter(ABC):
file=__file__,
message=f"Rate limited by {context_label} after {_MAX_RETRIES} attempts.",
)
+ if last_status is not None and last_status >= 500:
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Server error from {context_label} (HTTP {last_status}) after {_MAX_RETRIES} attempts: {last_body}",
+ )
raise ImageRegistryNetworkError(
file=__file__,
message=f"Failed to connect to {context_label} after {_MAX_RETRIES} attempts.",
original_exception=last_exception,
)
- def _next_page_url(self, resp: requests.Response) -> str | None:
+ @staticmethod
+ def _next_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
- if not match:
- return None
- url = match.group(1)
- if url.startswith("/"):
- parsed = urlparse(resp.url)
- return f"{parsed.scheme}://{parsed.netloc}{url}"
- return url
+ if match:
+ url = match.group(1)
+ if url.startswith("/"):
+ parsed = urlparse(resp.url)
+ return f"{parsed.scheme}://{parsed.netloc}{url}"
+ return url
+ return None
diff --git a/prowler/providers/image/lib/registry/dockerhub_adapter.py b/prowler/providers/image/lib/registry/dockerhub_adapter.py
index 6b3df09818..949de2d9cb 100644
--- a/prowler/providers/image/lib/registry/dockerhub_adapter.py
+++ b/prowler/providers/image/lib/registry/dockerhub_adapter.py
@@ -115,6 +115,7 @@ class DockerHubAdapter(RegistryAdapter):
return
if not self.username or not self.password:
return
+ logger.debug(f"Docker Hub login attempt for username: {self.username!r}")
resp = self._request_with_retry(
"POST",
f"{_HUB_API}/v2/users/login",
@@ -122,9 +123,14 @@ class DockerHubAdapter(RegistryAdapter):
context_label="Docker Hub",
)
if resp.status_code != 200:
+ body_preview = resp.text[:200] if resp.text else "(empty body)"
raise ImageRegistryAuthError(
file=__file__,
- message=f"Docker Hub login failed (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ message=(
+ f"Docker Hub login failed (HTTP {resp.status_code}). "
+ f"Check REGISTRY_USERNAME and REGISTRY_PASSWORD. "
+ f"Response: {body_preview}"
+ ),
)
self._hub_jwt = resp.json().get("token")
if not self._hub_jwt:
diff --git a/tests/lib/outputs/html/html_test.py b/tests/lib/outputs/html/html_test.py
index 727138d4b1..09e3038744 100644
--- a/tests/lib/outputs/html/html_test.py
+++ b/tests/lib/outputs/html/html_test.py
@@ -1,7 +1,7 @@
import sys
from io import StringIO
-from mock import patch
+from mock import MagicMock, patch
from prowler.config.config import prowler_version, timestamp
from prowler.lib.logger import logger
@@ -350,6 +350,62 @@ mongodbatlas_html_assessment_summary = """
"""
+image_registry_html_assessment_summary = """
+
+
+
+
+ -
+ Registry URL: myregistry.io
+
+
+
+
+
+
+
+
+ -
+ Image authentication method: Docker login
+
+
+
+
"""
+
+image_list_html_assessment_summary = """
+
+
+
+
+ -
+ Images: nginx:latest, alpine:3.18
+
+
+
+
+
+
+
+
+ -
+ Image authentication method: No auth
+
+
+
+
"""
+
def get_aws_html_header(args: list) -> str:
"""
@@ -854,6 +910,36 @@ class TestHTML:
assert summary == mongodbatlas_html_assessment_summary
+ def test_image_get_assessment_summary_with_registry(self):
+ """Test Image HTML assessment summary with registry URL."""
+ findings = [generate_finding_output()]
+ output = HTML(findings)
+
+ provider = MagicMock()
+ provider.type = "image"
+ provider.registry = "myregistry.io"
+ provider.images = ["nginx:latest", "alpine:3.18"]
+ provider.auth_method = "Docker login"
+
+ summary = output.get_assessment_summary(provider)
+
+ assert summary == image_registry_html_assessment_summary
+
+ def test_image_get_assessment_summary_with_images(self):
+ """Test Image HTML assessment summary with image list."""
+ findings = [generate_finding_output()]
+ output = HTML(findings)
+
+ provider = MagicMock()
+ provider.type = "image"
+ provider.registry = None
+ provider.images = ["nginx:latest", "alpine:3.18"]
+ provider.auth_method = "No auth"
+
+ summary = output.get_assessment_summary(provider)
+
+ assert summary == image_list_html_assessment_summary
+
def test_process_markdown_bold_text(self):
"""Test that **text** is converted to text"""
test_text = "This is **bold text** and this is **also bold**"
diff --git a/tests/providers/image/image_fixtures.py b/tests/providers/image/image_fixtures.py
index a51f934bda..920a7f225a 100644
--- a/tests/providers/image/image_fixtures.py
+++ b/tests/providers/image/image_fixtures.py
@@ -45,8 +45,16 @@ SAMPLE_UNKNOWN_SEVERITY_FINDING = {
"Description": "An issue with unknown severity.",
}
+# Sample image SHA for testing (first 12 chars of a sha256 digest)
+SAMPLE_IMAGE_SHA = "c1aabb73d233"
+SAMPLE_IMAGE_ID = f"sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"
+
# Full Trivy JSON output structure with a single vulnerability
SAMPLE_TRIVY_IMAGE_OUTPUT = {
+ "Metadata": {
+ "ImageID": SAMPLE_IMAGE_ID,
+ "RepoDigests": [f"alpine@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
+ },
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
@@ -55,11 +63,15 @@ SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Secrets": [],
"Misconfigurations": [],
}
- ]
+ ],
}
# Full Trivy JSON output with mixed finding types
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
+ "Metadata": {
+ "ImageID": SAMPLE_IMAGE_ID,
+ "RepoDigests": [f"myimage@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
+ },
"Results": [
{
"Target": "myimage:latest (debian 12)",
@@ -68,7 +80,36 @@ SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Secrets": [SAMPLE_SECRET_FINDING],
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
}
- ]
+ ],
+}
+
+# Trivy output with only RepoDigests (no ImageID) for fallback testing
+SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT = {
+ "Metadata": {
+ "RepoDigests": ["alpine@sha256:e5f6g7h8i9j0abcdef1234567890"],
+ },
+ "Results": [
+ {
+ "Target": "alpine:3.18 (alpine 3.18.0)",
+ "Type": "alpine",
+ "Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
+ "Secrets": [],
+ "Misconfigurations": [],
+ }
+ ],
+}
+
+# Trivy output with no Metadata at all
+SAMPLE_TRIVY_NO_METADATA_OUTPUT = {
+ "Results": [
+ {
+ "Target": "alpine:3.18 (alpine 3.18.0)",
+ "Type": "alpine",
+ "Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
+ "Secrets": [],
+ "Misconfigurations": [],
+ }
+ ],
}
@@ -90,3 +131,13 @@ def get_invalid_trivy_output():
def get_multi_type_trivy_output():
"""Return Trivy output with multiple finding types as string."""
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
+
+
+def get_repo_digest_only_trivy_output():
+ """Return Trivy output with only RepoDigests (no ImageID) as string."""
+ return json.dumps(SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT)
+
+
+def get_no_metadata_trivy_output():
+ """Return Trivy output with no Metadata as string."""
+ return json.dumps(SAMPLE_TRIVY_NO_METADATA_OUTPUT)
diff --git a/tests/providers/image/image_provider_test.py b/tests/providers/image/image_provider_test.py
index 32fd92f45a..b25e9edfc2 100644
--- a/tests/providers/image/image_provider_test.py
+++ b/tests/providers/image/image_provider_test.py
@@ -15,11 +15,13 @@ from prowler.providers.image.exceptions.exceptions import (
ImageListFileNotFoundError,
ImageListFileReadError,
ImageNoImagesProvidedError,
+ ImageRegistryAuthError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
+ SAMPLE_IMAGE_SHA,
SAMPLE_MISCONFIGURATION_FINDING,
SAMPLE_SECRET_FINDING,
SAMPLE_UNKNOWN_SEVERITY_FINDING,
@@ -27,6 +29,8 @@ from tests.providers.image.image_fixtures import (
get_empty_trivy_output,
get_invalid_trivy_output,
get_multi_type_trivy_output,
+ get_no_metadata_trivy_output,
+ get_repo_digest_only_trivy_output,
get_sample_trivy_json_output,
)
@@ -42,10 +46,6 @@ def _make_provider(**kwargs):
class TestImageProvider:
- @patch.dict(
- os.environ,
- {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""},
- )
def test_image_provider(self):
"""Test default initialization."""
provider = _make_provider()
@@ -124,22 +124,27 @@ class TestImageProvider:
provider = _make_provider()
report = provider._process_finding(
SAMPLE_VULNERABILITY_FINDING,
+ "alpine:3.18",
"alpine:3.18 (alpine 3.18.0)",
- "alpine",
+ image_sha="c1aabb73d233",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "CVE-2024-1234"
assert report.check_metadata.Severity == "high"
- assert report.check_metadata.ServiceName == "alpine"
+ assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.ResourceType == "container-image"
assert report.check_metadata.ResourceGroup == "container"
assert report.package_name == "openssl"
assert report.installed_version == "1.1.1k-r0"
assert report.fixed_version == "1.1.1l-r0"
- assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
+ assert report.resource_name == "alpine:3.18"
+ assert report.image_sha == "c1aabb73d233"
+ assert report.resource_details == "alpine:3.18 (alpine 3.18.0)"
assert report.region == "container"
+ assert report.check_metadata.Categories == ["vulnerability"]
+ assert report.check_metadata.RelatedUrl == ""
def test_process_finding_secret(self):
"""Test processing a secret finding (identified by RuleID)."""
@@ -147,14 +152,15 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_SECRET_FINDING,
"myimage:latest",
- "secret",
+ "myimage:latest (debian 12)",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "aws-access-key-id"
assert report.check_metadata.Severity == "critical"
- assert report.check_metadata.ServiceName == "secret"
+ assert report.check_metadata.ServiceName == "container-image"
+ assert report.check_metadata.Categories == ["secrets"]
def test_process_finding_misconfiguration(self):
"""Test processing a misconfiguration finding (identified by ID)."""
@@ -162,13 +168,14 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_MISCONFIGURATION_FINDING,
"myimage:latest",
- "misconfiguration",
+ "myimage:latest (debian 12)",
)
assert isinstance(report, CheckReportImage)
assert report.check_metadata.CheckID == "DS001"
assert report.check_metadata.Severity == "medium"
- assert report.check_metadata.ServiceName == "misconfiguration"
+ assert report.check_metadata.ServiceName == "container-image"
+ assert report.check_metadata.Categories == []
def test_process_finding_unknown_severity(self):
"""Test that UNKNOWN severity is mapped to informational."""
@@ -176,7 +183,7 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_UNKNOWN_SEVERITY_FINDING,
"myimage:latest",
- "alpine",
+ "myimage:latest (alpine 3.18.0)",
)
assert report.check_metadata.Severity == "informational"
@@ -195,6 +202,9 @@ class TestImageProvider:
assert len(reports) == 1
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
+ assert reports[0].image_sha == SAMPLE_IMAGE_SHA
+ assert reports[0].resource_name == "alpine:3.18"
+ assert reports[0].check_metadata.ServiceName == "container-image"
@patch("subprocess.run")
def test_run_scan_empty_output(self, mock_subprocess):
@@ -279,20 +289,23 @@ class TestImageProvider:
)
assert "alpine:3.18" in output
- @patch("subprocess.run")
- def test_test_connection_success(self, mock_subprocess):
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_test_connection_success(self, mock_factory):
"""Test successful connection returns is_connected=True."""
- mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
+ mock_adapter = MagicMock()
+ mock_adapter.list_tags.return_value = ["3.18", "latest"]
+ mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="alpine:3.18")
assert result.is_connected is True
+ mock_adapter.list_tags.assert_called_once_with("library/alpine")
- @patch("subprocess.run")
- def test_test_connection_auth_failure(self, mock_subprocess):
- """Test 401 error returns auth failure."""
- mock_subprocess.return_value = MagicMock(
- returncode=1, stderr="401 unauthorized"
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_test_connection_auth_failure(self, mock_factory):
+ """Test registry auth error returns auth failure."""
+ mock_factory.return_value = MagicMock(
+ list_tags=MagicMock(side_effect=ImageRegistryAuthError(file=__file__))
)
result = ImageProvider.test_connection(image="private/image:latest")
@@ -300,16 +313,36 @@ class TestImageProvider:
assert result.is_connected is False
assert "Authentication failed" in result.error
- @patch("subprocess.run")
- def test_test_connection_not_found(self, mock_subprocess):
- """Test 404 error returns not found."""
- mock_subprocess.return_value = MagicMock(returncode=1, stderr="404 not found")
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_test_connection_not_found(self, mock_factory):
+ """Test tag not found returns not found error."""
+ mock_adapter = MagicMock()
+ mock_adapter.list_tags.return_value = ["v1", "v2"]
+ mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="nonexistent/image:latest")
assert result.is_connected is False
assert "not found" in result.error
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_test_connection_registry_url(self, mock_factory):
+ """Test registry URL (namespace) uses list_repositories."""
+ mock_adapter = MagicMock()
+ mock_adapter.list_repositories.return_value = ["andoniaf/myapp"]
+ mock_factory.return_value = mock_adapter
+
+ result = ImageProvider.test_connection(image="docker.io/andoniaf")
+
+ assert result.is_connected is True
+ mock_factory.assert_called_once_with(
+ registry_url="docker.io/andoniaf",
+ username=None,
+ password=None,
+ token=None,
+ )
+ mock_adapter.list_repositories.assert_called_once()
+
def test_build_status_extended(self):
"""Test status message content for different finding types."""
provider = _make_provider()
@@ -394,6 +427,51 @@ class TestImageProvider:
for _ in provider._scan_single_image("private/image:latest"):
pass
+ @patch("subprocess.run")
+ def test_sha_extraction_from_image_id(self, mock_subprocess):
+ """Test that image_sha is extracted from Trivy Metadata.ImageID."""
+ provider = _make_provider()
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+
+ reports = []
+ for batch in provider._scan_single_image("alpine:3.18"):
+ reports.extend(batch)
+
+ assert len(reports) == 1
+ assert reports[0].image_sha == SAMPLE_IMAGE_SHA
+
+ @patch("subprocess.run")
+ def test_sha_extraction_fallback_to_repo_digests(self, mock_subprocess):
+ """Test that image_sha falls back to RepoDigests when ImageID is absent."""
+ provider = _make_provider()
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_repo_digest_only_trivy_output(), stderr=""
+ )
+
+ reports = []
+ for batch in provider._scan_single_image("alpine:3.18"):
+ reports.extend(batch)
+
+ assert len(reports) == 1
+ assert reports[0].image_sha == "e5f6g7h8i9j0"
+
+ @patch("subprocess.run")
+ def test_sha_extraction_no_metadata(self, mock_subprocess):
+ """Test that image_sha is empty when no Metadata is present."""
+ provider = _make_provider()
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_no_metadata_trivy_output(), stderr=""
+ )
+
+ reports = []
+ for batch in provider._scan_single_image("alpine:3.18"):
+ reports.extend(batch)
+
+ assert len(reports) == 1
+ assert reports[0].image_sha == ""
+
@patch("subprocess.run")
def test_run_scan_propagates_scan_error(self, mock_subprocess):
"""Test that run_scan() re-raises ImageScanError instead of swallowing it."""
@@ -409,17 +487,14 @@ class TestImageProvider:
pass
-@patch.dict(
- os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""}
-)
class TestImageProviderRegistryAuth:
def test_no_auth_by_default(self):
"""Test that no auth is set when no credentials are provided."""
provider = _make_provider()
- assert not provider.registry_username
- assert not provider.registry_password
- assert not provider.registry_token
+ assert provider.registry_username is None
+ assert provider.registry_password is None
+ assert provider.registry_token is None
assert provider.auth_method == "No auth"
def test_basic_auth_with_explicit_params(self):
@@ -431,7 +506,7 @@ class TestImageProviderRegistryAuth:
assert provider.registry_username == "myuser"
assert provider.registry_password == "mypass"
- assert provider.auth_method == "Basic auth"
+ assert provider.auth_method == "Docker login"
def test_token_auth_with_explicit_param(self):
"""Test token auth via explicit constructor param."""
@@ -448,7 +523,7 @@ class TestImageProviderRegistryAuth:
registry_token="my-token",
)
- assert provider.auth_method == "Basic auth"
+ assert provider.auth_method == "Docker login"
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
@@ -459,7 +534,7 @@ class TestImageProviderRegistryAuth:
assert provider.registry_username == "envuser"
assert provider.registry_password == "envpass"
- assert provider.auth_method == "Basic auth"
+ assert provider.auth_method == "Docker login"
@patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
def test_token_auth_from_env_var(self):
@@ -491,8 +566,8 @@ class TestImageProviderRegistryAuth:
assert "TRIVY_PASSWORD" not in env
assert "TRIVY_REGISTRY_TOKEN" not in env
- def test_build_trivy_env_basic_auth_injects_trivy_vars(self):
- """Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for Trivy native auth."""
+ def test_build_trivy_env_basic_auth_sets_env_vars(self):
+ """Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
@@ -510,8 +585,8 @@ class TestImageProviderRegistryAuth:
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
- def test_execute_trivy_injects_trivy_env_with_basic_auth(self, mock_subprocess):
- """Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for Trivy native auth."""
+ def test_execute_trivy_sets_trivy_env_with_basic_auth(self, mock_subprocess):
+ """Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
@@ -527,10 +602,12 @@ class TestImageProviderRegistryAuth:
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
- @patch("subprocess.run")
- def test_test_connection_with_basic_auth(self, mock_subprocess):
- """Test test_connection passes TRIVY_USERNAME/PASSWORD via env for Trivy native auth."""
- mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_test_connection_with_basic_auth(self, mock_factory):
+ """Test test_connection passes credentials to the registry adapter."""
+ mock_adapter = MagicMock()
+ mock_adapter.list_tags.return_value = ["v1"]
+ mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
@@ -539,18 +616,19 @@ class TestImageProviderRegistryAuth:
)
assert result.is_connected is True
- # Should have 1 subprocess call: trivy only (no docker login/pull/logout)
- assert mock_subprocess.call_count == 1
- trivy_call = mock_subprocess.call_args
- assert trivy_call.args[0][0] == "trivy"
- env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
- assert env["TRIVY_USERNAME"] == "myuser"
- assert env["TRIVY_PASSWORD"] == "mypass"
+ mock_factory.assert_called_once_with(
+ registry_url="private.registry.io",
+ username="myuser",
+ password="mypass",
+ token=None,
+ )
- @patch("subprocess.run")
- def test_test_connection_with_token(self, mock_subprocess):
- """Test test_connection passes token via env."""
- mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_test_connection_with_token(self, mock_factory):
+ """Test test_connection passes token to the registry adapter."""
+ mock_adapter = MagicMock()
+ mock_adapter.list_tags.return_value = ["v1"]
+ mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
@@ -558,9 +636,12 @@ class TestImageProviderRegistryAuth:
)
assert result.is_connected is True
- call_kwargs = mock_subprocess.call_args
- env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
- assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
+ mock_factory.assert_called_once_with(
+ registry_url="private.registry.io",
+ username=None,
+ password=None,
+ token="my-token",
+ )
def test_print_credentials_shows_auth_method(self):
"""Test that print_credentials outputs the auth method."""
@@ -573,7 +654,7 @@ class TestImageProviderRegistryAuth:
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
- assert "Basic auth" in output
+ assert "Docker login" in output
class TestExtractRegistry:
@@ -616,120 +697,42 @@ class TestExtractRegistry:
assert ImageProvider._extract_registry("nginx") is None
-class TestTrivyAuthIntegration:
- @patch("subprocess.run")
- def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):
- """Test that run_scan() passes TRIVY_USERNAME/PASSWORD via env when credentials are set."""
- mock_subprocess.return_value = MagicMock(
- returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
- )
- provider = _make_provider(
- images=["ghcr.io/user/image:tag"],
- registry_username="myuser",
- registry_password="mypass",
- )
+class TestIsRegistryUrl:
+ def test_registry_url_with_namespace(self):
+ assert ImageProvider._is_registry_url("docker.io/andoniaf") is True
- reports = []
- for batch in provider.run_scan():
- reports.extend(batch)
+ def test_registry_url_ghcr(self):
+ assert ImageProvider._is_registry_url("ghcr.io/org") is True
- calls = mock_subprocess.call_args_list
- # Only trivy calls, no docker login/pull
- assert all(call.args[0][0] == "trivy" for call in calls)
- env = calls[0].kwargs.get("env") or calls[0][1].get("env")
- assert env["TRIVY_USERNAME"] == "myuser"
- assert env["TRIVY_PASSWORD"] == "mypass"
+ def test_image_ref_with_tag(self):
+ assert ImageProvider._is_registry_url("ghcr.io/user/image:tag") is False
- @patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
- @patch("subprocess.run")
- def test_run_scan_no_trivy_auth_without_credentials(self, mock_subprocess):
- """Test that run_scan() does NOT set TRIVY_USERNAME/PASSWORD when no credentials."""
- mock_subprocess.return_value = MagicMock(
- returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
- )
+ def test_image_ref_with_repo(self):
+ assert ImageProvider._is_registry_url("ghcr.io/user/image") is False
+
+ def test_dockerhub_short_image(self):
+ assert ImageProvider._is_registry_url("alpine:3.18") is False
+
+ def test_dockerhub_with_namespace(self):
+ assert ImageProvider._is_registry_url("andoniaf/test:tag") is False
+
+ def test_bare_image_name(self):
+ assert ImageProvider._is_registry_url("nginx") is False
+
+ def test_localhost_namespace(self):
+ assert ImageProvider._is_registry_url("localhost:5000/myns") is True
+
+ def test_localhost_image_with_tag(self):
+ assert ImageProvider._is_registry_url("localhost:5000/myns/image:v1") is False
+
+
+class TestCleanup:
+ def test_cleanup_idempotent(self):
+ """Test cleanup is safe to call multiple times."""
provider = _make_provider()
- for batch in provider.run_scan():
- pass
-
- calls = mock_subprocess.call_args_list
- assert all(call.args[0][0] == "trivy" for call in calls)
-
- @patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
- @patch("subprocess.run")
- def test_run_scan_token_auth_via_env(self, mock_subprocess):
- """Test that run_scan() passes TRIVY_REGISTRY_TOKEN when only token is provided."""
- mock_subprocess.return_value = MagicMock(
- returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
- )
- provider = _make_provider(registry_token="my-token")
-
- for batch in provider.run_scan():
- pass
-
- calls = mock_subprocess.call_args_list
- assert all(call.args[0][0] == "trivy" for call in calls)
- env = calls[0].kwargs.get("env") or calls[0][1].get("env")
- assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
-
- @patch("subprocess.run")
- def test_run_with_credentials_only_calls_trivy(self, mock_subprocess):
- """Test that run() only calls trivy (no docker login/pull/logout)."""
- mock_subprocess.return_value = MagicMock(
- returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
- )
- provider = _make_provider(
- images=["ghcr.io/user/image:tag"],
- registry_username="myuser",
- registry_password="mypass",
- )
-
- provider.run()
-
- calls = mock_subprocess.call_args_list
- assert all(call.args[0][0] == "trivy" for call in calls)
-
- @patch("subprocess.run")
- def test_run_scan_multiple_images_all_get_trivy_env(self, mock_subprocess):
- """Test that all trivy calls get TRIVY_USERNAME/PASSWORD when scanning multiple images."""
- mock_subprocess.return_value = MagicMock(
- returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
- )
- provider = _make_provider(
- images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
- registry_username="myuser",
- registry_password="mypass",
- )
-
- for batch in provider.run_scan():
- pass
-
- calls = mock_subprocess.call_args_list
- trivy_calls = [c for c in calls if c.args[0][0] == "trivy"]
- assert len(trivy_calls) == 2
- for call in trivy_calls:
- env = call.kwargs.get("env") or call[1].get("env")
- assert env["TRIVY_USERNAME"] == "myuser"
- assert env["TRIVY_PASSWORD"] == "mypass"
-
- @patch("subprocess.run")
- def test_test_connection_docker_hub_uses_trivy_auth(self, mock_subprocess):
- """Test test_connection passes TRIVY creds for Docker Hub images."""
- mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
-
- result = ImageProvider.test_connection(
- image="andoniaf/test-private:tag",
- registry_username="myuser",
- registry_password="mypass",
- )
-
- assert result.is_connected is True
- assert mock_subprocess.call_count == 1
- trivy_call = mock_subprocess.call_args
- assert trivy_call.args[0][0] == "trivy"
- env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
- assert env["TRIVY_USERNAME"] == "myuser"
- assert env["TRIVY_PASSWORD"] == "mypass"
+ provider.cleanup()
+ provider.cleanup()
class TestImageProviderInputValidation:
@@ -921,3 +924,67 @@ class TestImageProviderNameValidation:
with pytest.raises(ImageListFileReadError):
_make_provider(images=None, image_list_file=file_path)
+
+
+class TestScanPerImage:
+ @patch("subprocess.run")
+ def test_yields_per_image(self, mock_subprocess):
+ """Test that scan_per_image yields (name, findings) per image."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
+
+ results = list(provider.scan_per_image())
+
+ assert len(results) == 2
+ for name, findings in results:
+ assert isinstance(name, str)
+ assert isinstance(findings, list)
+ assert all(isinstance(f, CheckReportImage) for f in findings)
+
+ @patch("subprocess.run")
+ def test_reraises_scan_error(self, mock_subprocess):
+ """Test that ImageScanError propagates from scan_per_image."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=1, stdout="", stderr="scan failed"
+ )
+ provider = _make_provider(images=["alpine:3.18"])
+
+ with pytest.raises(ImageScanError):
+ list(provider.scan_per_image())
+
+ @patch("subprocess.run")
+ def test_skips_generic_error(self, mock_subprocess):
+ """Test that a generic RuntimeError in _scan_single_image yields empty findings and continues."""
+
+ def side_effect(cmd, **kwargs):
+ if "bad:image" in cmd:
+ raise RuntimeError("unexpected error")
+ return MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+
+ mock_subprocess.side_effect = side_effect
+ provider = _make_provider(images=["bad:image", "alpine:3.18"])
+
+ results = list(provider.scan_per_image())
+
+ assert len(results) == 2
+ assert results[0][0] == "bad:image"
+ assert results[0][1] == []
+ assert results[1][0] == "alpine:3.18"
+ assert len(results[1][1]) > 0
+
+ @patch("subprocess.run")
+ def test_calls_cleanup(self, mock_subprocess):
+ """Test that cleanup is called even after scan_per_image completes."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider(images=["alpine:3.18"])
+
+ with mock.patch.object(provider, "cleanup") as mock_cleanup:
+ list(provider.scan_per_image())
+
+ mock_cleanup.assert_called_once()
diff --git a/tests/providers/image/lib/registry/test_dockerhub_adapter.py b/tests/providers/image/lib/registry/test_dockerhub_adapter.py
index 4f2d91d57c..930e872bba 100644
--- a/tests/providers/image/lib/registry/test_dockerhub_adapter.py
+++ b/tests/providers/image/lib/registry/test_dockerhub_adapter.py
@@ -99,7 +99,7 @@ class TestDockerHubListTags:
class TestDockerHubLogin:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_failure(self, mock_request):
- resp = MagicMock(status_code=401)
+ resp = MagicMock(status_code=401, text="invalid credentials")
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="bad", password="creds")
with pytest.raises(ImageRegistryAuthError, match="login failed"):
@@ -110,6 +110,29 @@ class TestDockerHubLogin:
adapter._hub_login() # Should not raise
assert adapter._hub_jwt is None
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_login_401_includes_response_body(self, mock_request):
+ resp = MagicMock(
+ status_code=401, text='{"detail":"Incorrect authentication credentials"}'
+ )
+ mock_request.return_value = resp
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ with pytest.raises(
+ ImageRegistryAuthError, match="Incorrect authentication credentials"
+ ):
+ adapter._hub_login()
+
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_login_500_retried_then_raises_network_error(
+ self, mock_request, mock_sleep
+ ):
+ mock_request.return_value = MagicMock(status_code=500)
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ with pytest.raises(ImageRegistryNetworkError, match="Server error"):
+ adapter._hub_login()
+ assert mock_request.call_count == 3
+
class TestDockerHubRetry:
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@@ -133,6 +156,63 @@ class TestDockerHubRetry:
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_retry_on_500(self, mock_request, mock_sleep):
+ resp_500 = MagicMock(status_code=500)
+ resp_200 = MagicMock(status_code=200)
+ mock_request.side_effect = [resp_500, resp_200]
+ adapter = DockerHubAdapter("docker.io/myorg")
+ result = adapter._request_with_retry("GET", "https://hub.docker.com")
+ assert result.status_code == 200
+ assert mock_request.call_count == 2
+ mock_sleep.assert_called_once()
+
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_retry_exhausted_on_500_raises_network_error(
+ self, mock_request, mock_sleep
+ ):
+ mock_request.return_value = MagicMock(status_code=500)
+ adapter = DockerHubAdapter("docker.io/myorg")
+ with pytest.raises(
+ ImageRegistryNetworkError, match="Server error.*HTTP 500.*3 attempts"
+ ):
+ adapter._request_with_retry("GET", "https://hub.docker.com")
+ assert mock_request.call_count == 3
+
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_4xx_not_retried(self, mock_request, mock_sleep):
+ mock_request.return_value = MagicMock(status_code=403)
+ adapter = DockerHubAdapter("docker.io/myorg")
+ result = adapter._request_with_retry("GET", "https://hub.docker.com")
+ assert result.status_code == 403
+ assert mock_request.call_count == 1
+ mock_sleep.assert_not_called()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_request_sends_user_agent(self, mock_request):
+ mock_request.return_value = MagicMock(status_code=200)
+ adapter = DockerHubAdapter("docker.io/myorg")
+ adapter._request_with_retry("GET", "https://hub.docker.com")
+ _, kwargs = mock_request.call_args
+ from prowler.config.config import prowler_version
+
+ assert (
+ kwargs["headers"]["User-Agent"]
+ == f"Prowler/{prowler_version} (registry-adapter)"
+ )
+
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_retry_500_includes_response_body(self, mock_request, mock_sleep):
+ resp_500 = MagicMock(status_code=500, text="Cloudflare error")
+ mock_request.return_value = resp_500
+ adapter = DockerHubAdapter("docker.io/myorg")
+ with pytest.raises(ImageRegistryNetworkError, match="Cloudflare error"):
+ adapter._request_with_retry("GET", "https://hub.docker.com")
+
class TestDockerHubEmptyTokens:
@patch("prowler.providers.image.lib.registry.base.requests.request")
diff --git a/tests/providers/image/lib/registry/test_oci_adapter.py b/tests/providers/image/lib/registry/test_oci_adapter.py
index 5deda42fe2..b1006ea6cd 100644
--- a/tests/providers/image/lib/registry/test_oci_adapter.py
+++ b/tests/providers/image/lib/registry/test_oci_adapter.py
@@ -288,31 +288,33 @@ class TestOciAdapterRetry:
class TestOciAdapterNextPageUrl:
def test_no_link_header(self):
- adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(headers={})
- assert adapter._next_page_url(resp) is None
+ assert OciRegistryAdapter._next_page_url(resp) is None
def test_link_header_with_next(self):
- adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(
headers={"Link": '; rel="next"'}
)
- assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?n=200&last=b"
+ assert (
+ OciRegistryAdapter._next_page_url(resp)
+ == "https://reg.io/v2/_catalog?n=200&last=b"
+ )
+
+ def test_link_header_relative_url(self):
+ resp = MagicMock(
+ headers={"Link": '; rel="next"'},
+ url="https://reg.io/v2/_catalog?n=200",
+ )
+ assert (
+ OciRegistryAdapter._next_page_url(resp)
+ == "https://reg.io/v2/_catalog?n=200&last=b"
+ )
def test_link_header_no_next(self):
- adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(
headers={"Link": '; rel="prev"'}
)
- assert adapter._next_page_url(resp) is None
-
- def test_link_header_relative_url(self):
- adapter = OciRegistryAdapter("reg.io")
- resp = MagicMock(
- url="https://reg.io/v2/_catalog?n=200",
- headers={"Link": '; rel="next"'},
- )
- assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?last=b&n=200"
+ assert OciRegistryAdapter._next_page_url(resp) is None
class TestOciAdapterSSRF:
diff --git a/tests/providers/image/lib/registry/test_provider_registry.py b/tests/providers/image/lib/registry/test_provider_registry.py
index 86667f0412..97901ea0c4 100644
--- a/tests/providers/image/lib/registry/test_provider_registry.py
+++ b/tests/providers/image/lib/registry/test_provider_registry.py
@@ -152,16 +152,15 @@ class TestEmptyRegistry:
class TestRegistryList:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
- def test_registry_list_prints_and_exits(self, mock_factory, capsys):
+ def test_registry_list_prints_and_returns(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
- with pytest.raises(SystemExit) as exc_info:
- _build_provider(registry_list_images=True)
+ provider = _build_provider(registry_list_images=True)
- assert exc_info.value.code == 0
+ assert provider._listing_only is True
captured = capsys.readouterr()
assert "app/frontend" in captured.out
assert "app/backend" in captured.out
@@ -177,10 +176,9 @@ class TestRegistryList:
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
- with pytest.raises(SystemExit) as exc_info:
- _build_provider(registry_list_images=True, image_filter="^prod/")
+ provider = _build_provider(registry_list_images=True, image_filter="^prod/")
- assert exc_info.value.code == 0
+ assert provider._listing_only is True
captured = capsys.readouterr()
assert "prod/app" in captured.out
assert "dev/app" not in captured.out
@@ -193,10 +191,9 @@ class TestRegistryList:
adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
mock_factory.return_value = adapter
- with pytest.raises(SystemExit) as exc_info:
- _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
+ provider = _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
- assert exc_info.value.code == 0
+ assert provider._listing_only is True
captured = capsys.readouterr()
assert "v1.0" in captured.out
assert "dev-abc" not in captured.out
@@ -210,10 +207,9 @@ class TestRegistryList:
mock_factory.return_value = adapter
# max_images=1 would normally raise, but --registry-list skips it
- with pytest.raises(SystemExit) as exc_info:
- _build_provider(registry_list_images=True, max_images=1)
+ provider = _build_provider(registry_list_images=True, max_images=1)
- assert exc_info.value.code == 0
+ assert provider._listing_only is True
captured = capsys.readouterr()
assert "6 images" in captured.out