Compare commits

...

7 Commits

Author SHA1 Message Date
Andoni A.
bd9c4d696f feat(image): enable misconfig scanner by default
Include misconfiguration scanning alongside vuln and secret as a
default Trivy scanner for container image assessments.
2026-02-24 12:38:20 +01:00
Andoni A.
0caaddfb4a fix(image): resolve migration conflict and refactor API connection test
- Renumber image migration from 0076 to 0081 to avoid conflict with openstack
- Add registry parameter to ImageProvider.test_connection() for registry-level testing
- Refactor API connection test to delegate to test_connection() like other providers
- Add tests for registry path in test_connection()
2026-02-19 16:27:04 +01:00
Andoni A.
8197418f71 wip: update pyproject.toml 2026-02-19 13:38:06 +01:00
Andoni A.
c14cd0e0bc feat(image): add image provider API support with registry scan mode
- Add IMAGE provider type to API models, serializers, views, and utils
- Add registry credential handling (username/password, token, filters)
- Add UID validation for registry URLs with port validation
- Add connection testing via OCI registry adapter
- Add registry scan mode with OCI, Docker Hub, and ECR adapter layer
- Add per-image progress tracking and Trivy native auth
- Skip compliance/reports for IMAGE provider scans
- Add migration, OpenAPI spec updates, and comprehensive tests
2026-02-19 13:07:48 +01:00
Andoni A.
d4d0a8a05a feat(image): add docker login and pull for private registry authentication
Trivy's remote source cannot authenticate against Docker Hub (and some
other registries) even after docker login. This adds a docker login +
docker pull flow before scanning so Trivy can access private images
from the local Docker daemon.

- Add _docker_login, _docker_pull, _docker_logout, cleanup methods
- Add _extract_registry to determine registry from image reference
- Wrap run() in try/finally to ensure cleanup on success or error
- Wire registry credentials from CLI args to ImageProvider
- Add ImageDockerLoginError and ImageDockerNotFoundError exceptions
2026-02-19 12:59:19 +01:00
Andoni A.
a787c62b9f chore(image): remove POC mention from CHANGELOG and drop provider README 2026-02-19 12:57:36 +01:00
Andoni A.
6ffcafad94 feat(image): add container image provider for CLI scanning
Add a new Image provider that uses Trivy for container image vulnerability
and secret scanning, integrated into the Prowler CLI.

- ImageProvider class with Trivy integration for vuln/secret/misconfig scanning
- CLI support via `prowler image -I <image>` with severity filters, timeout,
  ignore-unfixed, and image-list-file options
- CheckReportImage model for image-specific findings
- Custom exceptions (9000-9005) with clear remediation messages
- Error handling for Trivy failures (non-zero exit, binary not found)
- Batch processing of findings with progress bar
- test_connection() for registry accessibility checks
- Comprehensive test coverage
2026-02-19 12:57:35 +01:00
32 changed files with 3283 additions and 65 deletions

View File

@@ -8,6 +8,7 @@ All notable changes to the **Prowler API** are documented in this file.
- OpenStack provider support [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003)
- PDF report for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)
- Image provider support with registry credential handling, UID validation, and connection testing
### 🔄 Changed

View File

@@ -24,7 +24,7 @@ dependencies = [
"drf-spectacular-jsonapi==0.5.1",
"gunicorn==23.0.0",
"lxml==5.3.2",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@feat/PROWLER-940-stage-2-a-image-provider-api-rebased",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (>=1.0.1,<2.0.0)",
"sentry-sdk[django] (>=2.20.0,<3.0.0)",

View File

@@ -0,0 +1,40 @@
# Generated by Django migration for Image provider support
from django.db import migrations
import api.db_utils
class Migration(migrations.Migration):
dependencies = [
("api", "0080_backfill_attack_paths_graph_data_ready"),
]
operations = [
migrations.AlterField(
model_name="provider",
name="provider",
field=api.db_utils.ProviderEnumField(
choices=[
("aws", "AWS"),
("azure", "Azure"),
("gcp", "GCP"),
("kubernetes", "Kubernetes"),
("m365", "M365"),
("github", "GitHub"),
("mongodbatlas", "MongoDB Atlas"),
("iac", "IaC"),
("oraclecloud", "Oracle Cloud Infrastructure"),
("alibabacloud", "Alibaba Cloud"),
("cloudflare", "Cloudflare"),
("openstack", "OpenStack"),
("image", "Image"),
],
default="aws",
),
),
migrations.RunSQL(
"ALTER TYPE provider ADD VALUE IF NOT EXISTS 'image';",
reverse_sql=migrations.RunSQL.noop,
),
]

View File

@@ -289,6 +289,7 @@ class Provider(RowLevelSecurityProtectedModel):
ALIBABACLOUD = "alibabacloud", _("Alibaba Cloud")
CLOUDFLARE = "cloudflare", _("Cloudflare")
OPENSTACK = "openstack", _("OpenStack")
IMAGE = "image", _("Image")
@staticmethod
def validate_aws_uid(value):
@@ -423,6 +424,26 @@ class Provider(RowLevelSecurityProtectedModel):
pointer="/data/attributes/uid",
)
@staticmethod
def validate_image_uid(value):
pattern = r"^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?(:\d{1,5})?(/[a-zA-Z0-9._-]+)*/?$"
if not re.match(pattern, value):
raise ModelValidationError(
detail="Image provider ID must be a valid registry URL "
"(e.g., docker.io, ghcr.io, registry.example.com:5000).",
code="image-uid",
pointer="/data/attributes/uid",
)
port_match = re.search(r":(\d{1,5})(?=/|$)", value)
if port_match:
port = int(port_match.group(1))
if not 1 <= port <= 65535:
raise ModelValidationError(
detail="Port number must be between 1 and 65535.",
code="image-uid",
pointer="/data/attributes/uid",
)
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
inserted_at = models.DateTimeField(auto_now_add=True, editable=False)
updated_at = models.DateTimeField(auto_now=True, editable=False)
@@ -446,6 +467,8 @@ class Provider(RowLevelSecurityProtectedModel):
def clean(self):
super().clean()
if self.provider == self.ProviderChoices.IMAGE.value and self.uid:
self.uid = re.sub(r"^https?://", "", self.uid)
getattr(self, f"validate_{self.provider}_uid")(self.uid)
def save(self, *args, **kwargs):

View File

@@ -24,6 +24,7 @@ from prowler.providers.cloudflare.cloudflare_provider import CloudflareProvider
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
@@ -122,6 +123,7 @@ class TestReturnProwlerProvider:
(Provider.ProviderChoices.ALIBABACLOUD.value, AlibabacloudProvider),
(Provider.ProviderChoices.CLOUDFLARE.value, CloudflareProvider),
(Provider.ProviderChoices.OPENSTACK.value, OpenstackProvider),
(Provider.ProviderChoices.IMAGE.value, ImageProvider),
],
)
def test_return_prowler_provider(self, provider_type, expected_provider):
@@ -188,6 +190,54 @@ class TestProwlerProviderConnectionTest:
assert isinstance(connection.error, Provider.secret.RelatedObjectDoesNotExist)
assert str(connection.error) == "Provider has no secret."
@patch(
"prowler.providers.image.lib.registry.factory.create_registry_adapter",
)
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_image_success(
self, mock_return_prowler_provider, mock_create_adapter
):
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.uid = "ghcr.io"
provider.secret.secret = {"registry_token": "tok"}
mock_adapter = MagicMock()
mock_create_adapter.return_value = mock_adapter
connection = prowler_provider_connection_test(provider)
assert connection.is_connected is True
assert connection.error is None
mock_create_adapter.assert_called_once_with(
registry_url="ghcr.io",
username=None,
password=None,
token="tok",
)
mock_adapter.list_repositories.assert_called_once()
@patch(
"prowler.providers.image.lib.registry.factory.create_registry_adapter",
)
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_image_failure(
self, mock_return_prowler_provider, mock_create_adapter
):
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.uid = "ghcr.io"
provider.secret.secret = {"registry_token": "bad-token"}
mock_adapter = MagicMock()
mock_adapter.list_repositories.side_effect = Exception("401 Unauthorized")
mock_create_adapter.return_value = mock_adapter
connection = prowler_provider_connection_test(provider)
assert connection.is_connected is False
assert connection.error == "401 Unauthorized"
class TestGetProwlerProviderKwargs:
@pytest.mark.parametrize(
@@ -336,6 +386,103 @@ class TestGetProwlerProviderKwargs:
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider(self):
"""Test that Image provider gets correct kwargs with registry URL and auth."""
provider_uid = "ghcr.io"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {
"registry": provider_uid,
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_with_filters(self):
"""Test that Image provider includes scan filters."""
provider_uid = "docker.io"
secret_dict = {
"registry_token": "ghp_abc123",
"image_filter": "my-app.*",
"tag_filter": "v[0-9]+",
"max_images": 50,
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {
"registry": provider_uid,
"registry_token": "ghp_abc123",
"image_filter": "my-app.*",
"tag_filter": "v[0-9]+",
"max_images": 50,
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_no_auth(self):
"""Test that Image provider works with empty secret for public registries."""
provider_uid = "docker.io"
secret_dict = {}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {"registry": provider_uid}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_ignores_mutelist(self):
"""Test that Image provider does NOT receive mutelist_content.
Image provider uses Trivy's built-in logic, so it should not
receive mutelist_content even when a mutelist processor is configured.
"""
provider_uid = "ghcr.io"
secret_dict = {"registry_token": "test_token"}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
mutelist_processor = MagicMock()
mutelist_processor.configuration = {"Mutelist": {"key": "value"}}
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider, mutelist_processor)
assert "mutelist_content" not in result
expected_result = {
"registry": provider_uid,
"registry_token": "test_token",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_unsupported_provider(self):
# Setup
provider_uid = "provider_uid"

View File

@@ -1189,6 +1189,26 @@ class TestProviderViewSet:
"uid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"alias": "OpenStack Project",
},
{
"provider": "image",
"uid": "ghcr.io",
"alias": "GitHub Registry",
},
{
"provider": "image",
"uid": "docker.io",
"alias": "Docker Hub",
},
{
"provider": "image",
"uid": "123456789012.dkr.ecr.us-east-1.amazonaws.com",
"alias": "ECR",
},
{
"provider": "image",
"uid": "https://registry.example.com:5000",
"alias": "Private",
},
]
),
)
@@ -1633,6 +1653,26 @@ class TestProviderViewSet:
"min_length",
"uid",
),
# Image UID validation - too short (below min_length)
(
{
"provider": "image",
"uid": "ab",
"alias": "test",
},
"min_length",
"uid",
),
# Image UID validation - invalid characters (space)
(
{
"provider": "image",
"uid": "not valid!",
"alias": "test",
},
"image-uid",
"uid",
),
]
),
)
@@ -1788,6 +1828,38 @@ class TestProviderViewSet:
assert "Content-Location" in response.headers
assert response.headers["Content-Location"] == f"/api/v1/tasks/{task_mock.id}"
@patch("api.v1.views.Task.objects.get")
@patch("api.v1.views.check_provider_connection_task.delay")
def test_providers_connection_image(
self,
mock_provider_connection,
mock_task_get,
authenticated_client,
providers_fixture,
tasks_fixture,
):
prowler_task = tasks_fixture[0]
task_mock = Mock()
task_mock.id = prowler_task.id
task_mock.status = "PENDING"
mock_provider_connection.return_value = task_mock
mock_task_get.return_value = prowler_task
image_provider = providers_fixture[10]
assert image_provider.provider == "image"
assert image_provider.connected is None
assert image_provider.connection_last_checked_at is None
response = authenticated_client.post(
reverse("provider-connection", kwargs={"pk": image_provider.id})
)
assert response.status_code == status.HTTP_202_ACCEPTED
mock_provider_connection.assert_called_once_with(
provider_id=str(image_provider.id), tenant_id=ANY
)
assert "Content-Location" in response.headers
assert response.headers["Content-Location"] == f"/api/v1/tasks/{task_mock.id}"
def test_providers_connection_invalid_provider(
self, authenticated_client, providers_fixture
):
@@ -2436,6 +2508,39 @@ class TestProviderSecretViewSet:
"clouds_yaml_cloud": "mycloud",
},
),
# Image with Docker login
(
Provider.ProviderChoices.IMAGE.value,
ProviderSecret.TypeChoices.STATIC,
{
"registry_username": "user",
"registry_password": "pass",
},
),
# Image with token
(
Provider.ProviderChoices.IMAGE.value,
ProviderSecret.TypeChoices.STATIC,
{
"registry_token": "ghp_abc123",
},
),
# Image with no auth + filters
(
Provider.ProviderChoices.IMAGE.value,
ProviderSecret.TypeChoices.STATIC,
{
"image_filter": "my-app.*",
"tag_filter": "v[0-9]+",
"max_images": 50,
},
),
# Image with empty secret (public registry)
(
Provider.ProviderChoices.IMAGE.value,
ProviderSecret.TypeChoices.STATIC,
{},
),
],
)
def test_provider_secrets_create_valid(

View File

@@ -28,6 +28,7 @@ if TYPE_CHECKING:
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import (
@@ -83,6 +84,7 @@ def return_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
| ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -95,7 +97,7 @@ def return_prowler_provider(
provider (Provider): The provider object containing the provider type and associated secrets.
Returns:
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
Raises:
ValueError: If the provider type specified in `provider.provider` is not supported.
@@ -159,6 +161,10 @@ def return_prowler_provider(
from prowler.providers.openstack.openstack_provider import OpenstackProvider
prowler_provider = OpenstackProvider
case Provider.ProviderChoices.IMAGE.value:
from prowler.providers.image.image_provider import ImageProvider
prowler_provider = ImageProvider
case _:
raise ValueError(f"Provider type {provider.provider} not supported")
return prowler_provider
@@ -221,11 +227,30 @@ def get_prowler_provider_kwargs(
# clouds.yaml is not feasible because not all auth methods include it and the
# Keystone API is unavailable on public clouds.
pass
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
prowler_provider_kwargs = {
"registry": provider.uid,
}
secret = provider.secret.secret
for key in (
"registry_username",
"registry_password",
"registry_token",
"image_filter",
"tag_filter",
):
if key in secret:
prowler_provider_kwargs[key] = secret[key]
if "max_images" in secret:
prowler_provider_kwargs["max_images"] = int(secret["max_images"])
if mutelist_processor:
mutelist_content = mutelist_processor.configuration.get("Mutelist", {})
# IaC provider doesn't support mutelist (uses Trivy's built-in logic)
if mutelist_content and provider.provider != Provider.ProviderChoices.IAC.value:
# IaC and Image providers don't support mutelist (Trivy handles its own logic)
if mutelist_content and provider.provider not in (
Provider.ProviderChoices.IAC.value,
Provider.ProviderChoices.IMAGE.value,
):
prowler_provider_kwargs["mutelist_content"] = mutelist_content
return prowler_provider_kwargs
@@ -242,6 +267,7 @@ def initialize_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
| ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -255,7 +281,7 @@ def initialize_prowler_provider(
mutelist_processor (Processor): The mutelist processor object containing the mutelist configuration.
Returns:
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
initialized with the provider's secrets.
"""
prowler_provider = return_prowler_provider(provider)
@@ -297,6 +323,15 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
"raise_on_exception": False,
}
return prowler_provider.test_connection(**openstack_kwargs)
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
image_kwargs = {
"registry": provider.uid,
"raise_on_exception": False,
}
for key in ("registry_username", "registry_password", "registry_token"):
if key in prowler_provider_kwargs:
image_kwargs[key] = prowler_provider_kwargs[key]
return prowler_provider.test_connection(**image_kwargs)
else:
return prowler_provider.test_connection(
**prowler_provider_kwargs,

View File

@@ -388,6 +388,37 @@ from rest_framework_json_api import serializers
},
"required": ["clouds_yaml_content", "clouds_yaml_cloud"],
},
{
"type": "object",
"title": "Image Registry Credentials",
"properties": {
"registry_username": {
"type": "string",
"description": "Username for Docker login authentication.",
},
"registry_password": {
"type": "string",
"description": "Password for Docker login authentication.",
},
"registry_token": {
"type": "string",
"description": "Bearer token for registry authentication.",
},
"image_filter": {
"type": "string",
"description": "Regex pattern to filter repository names during registry enumeration.",
},
"tag_filter": {
"type": "string",
"description": "Regex pattern to filter image tags during registry enumeration.",
},
"max_images": {
"type": "integer",
"minimum": 0,
"description": "Maximum number of images to scan (0 = unlimited).",
},
},
},
]
}
)

View File

@@ -1528,6 +1528,8 @@ class BaseWriteProviderSecretSerializer(BaseWriteSerializer):
)
elif provider_type == Provider.ProviderChoices.OPENSTACK.value:
serializer = OpenStackCloudsYamlProviderSecret(data=secret)
elif provider_type == Provider.ProviderChoices.IMAGE.value:
serializer = ImageProviderSecret(data=secret)
else:
raise serializers.ValidationError(
{"provider": f"Provider type not supported {provider_type}"}
@@ -1666,6 +1668,37 @@ class IacProviderSecret(serializers.Serializer):
resource_name = "provider-secrets"
class ImageProviderSecret(serializers.Serializer):
registry_username = serializers.CharField(required=False)
registry_password = serializers.CharField(required=False)
registry_token = serializers.CharField(required=False)
image_filter = serializers.CharField(required=False)
tag_filter = serializers.CharField(required=False)
max_images = serializers.IntegerField(required=False, min_value=0)
def validate(self, attrs):
has_username = attrs.get("registry_username")
has_password = attrs.get("registry_password")
has_token = attrs.get("registry_token")
if (has_username or has_password) and has_token:
raise serializers.ValidationError(
"You cannot provide both registry_username/registry_password and registry_token."
)
if has_username and not has_password:
raise serializers.ValidationError(
"registry_password is required when registry_username is provided."
)
if has_password and not has_username:
raise serializers.ValidationError(
"registry_username is required when registry_password is provided."
)
return super().validate(attrs)
class Meta:
resource_name = "provider-secrets"
class OracleCloudProviderSecret(serializers.Serializer):
user = serializers.CharField()
fingerprint = serializers.CharField()

View File

@@ -543,6 +543,12 @@ def providers_fixture(tenants_fixture):
alias="openstack_testing",
tenant_id=tenant.id,
)
provider12 = Provider.objects.create(
provider="image",
uid="ghcr.io",
alias="image_testing",
tenant_id=tenant.id,
)
return (
provider1,
@@ -556,6 +562,7 @@ def providers_fixture(tenants_fixture):
provider9,
provider10,
provider11,
provider12,
)

View File

@@ -137,6 +137,10 @@ COMPLIANCE_CLASS_MAP = {
# IaC provider doesn't have specific compliance frameworks yet
# Trivy handles its own compliance checks
],
"image": [
# Image provider doesn't have specific compliance frameworks yet
# Trivy handles its own compliance checks
],
"oraclecloud": [
(lambda name: name.startswith("cis_"), OracleCloudCIS),
(lambda name: name.startswith("csa_"), OracleCloudCSA),

View File

@@ -134,13 +134,41 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
scan_id (str): The ID of the scan that was performed.
provider_id (str): The primary key of the Provider instance that was scanned.
"""
chain(
create_compliance_requirements_task.si(tenant_id=tenant_id, scan_id=scan_id),
update_provider_compliance_scores_task.si(tenant_id=tenant_id, scan_id=scan_id),
).apply_async()
aggregate_attack_surface_task.apply_async(
kwargs={"tenant_id": tenant_id, "scan_id": scan_id}
with rls_transaction(tenant_id):
provider_type = Provider.objects.get(id=provider_id).provider
has_compliance = provider_type not in (
Provider.ProviderChoices.IAC.value,
Provider.ProviderChoices.IMAGE.value,
)
if has_compliance:
chain(
create_compliance_requirements_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
update_provider_compliance_scores_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
).apply_async()
aggregate_attack_surface_task.apply_async(
kwargs={"tenant_id": tenant_id, "scan_id": scan_id}
)
final_group_tasks = [
check_integrations_task.si(
tenant_id=tenant_id,
provider_id=provider_id,
scan_id=scan_id,
),
]
if has_compliance:
final_group_tasks.append(
generate_compliance_reports_task.si(
tenant_id=tenant_id, scan_id=scan_id, provider_id=provider_id
),
)
chain(
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
group(
@@ -149,17 +177,7 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id
),
),
group(
# Use optimized task that generates both reports with shared queries
generate_compliance_reports_task.si(
tenant_id=tenant_id, scan_id=scan_id, provider_id=provider_id
),
check_integrations_task.si(
tenant_id=tenant_id,
provider_id=provider_id,
scan_id=scan_id,
),
),
group(*final_group_tasks),
).apply_async()
if can_provider_run_attack_paths_scan(tenant_id, provider_id):

View File

@@ -142,6 +142,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- CIS 5.0 compliance framework for the Azure provider [(#9777)](https://github.com/prowler-cloud/prowler/pull/9777)
- `Cloudflare` Bot protection, WAF, Privacy, Anti-Scraping and Zone configuration checks [(#9425)](https://github.com/prowler-cloud/prowler/pull/9425)
- `Cloudflare` `waf` and `dns record` checks [(#9426)](https://github.com/prowler-cloud/prowler/pull/9426)
- Container Image provider using Trivy for vulnerability and secret scanning
### Changed

View File

@@ -27,6 +27,7 @@ from prowler.lib.scan.exceptions.exceptions import (
from prowler.providers.common.models import Audit_Metadata, ProviderOutputOptions
from prowler.providers.common.provider import Provider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
class Scan:
@@ -92,10 +93,10 @@ class Scan:
except ValueError:
raise ScanInvalidStatusError(f"Invalid status provided: {s}.")
# Special setup for IaC provider - override inputs to work with traditional flow
if provider.type == "iac":
# IaC doesn't use traditional Prowler checks, so clear all input parameters
# to avoid validation errors and let it flow through the normal logic
# Special setup for IaC/Image providers - override inputs to work with traditional flow
if provider.type in ("iac", "image"):
# These providers don't use traditional Prowler checks, so clear all input
# parameters to avoid validation errors and let them flow through the normal logic
checks = None
services = None
excluded_checks = None
@@ -160,8 +161,8 @@ class Scan:
)
# Load checks to execute
if provider.type == "iac":
self._checks_to_execute = ["iac_scan"] # Dummy check name for IaC
if provider.type in ("iac", "image"):
self._checks_to_execute = [f"{provider.type}_scan"] # Dummy check name
else:
self._checks_to_execute = sorted(
load_checks_to_execute(
@@ -200,8 +201,8 @@ class Scan:
self._number_of_checks_to_execute = len(self._checks_to_execute)
# Set up service-based checks tracking
if provider.type == "iac":
service_checks_to_execute = {"iac": set(["iac_scan"])}
if provider.type in ("iac", "image"):
service_checks_to_execute = {provider.type: set([f"{provider.type}_scan"])}
else:
service_checks_to_execute = get_service_checks_to_execute(
self._checks_to_execute
@@ -346,6 +347,68 @@ class Scan:
self._duration = int((end_time - start_time).total_seconds())
return
# Special handling for Image provider
elif self._provider.type == "image":
if isinstance(self._provider, ImageProvider):
logger.info("Running Image scan with Trivy...")
total_images = len(self._provider.images)
self._number_of_checks_to_execute = max(total_images, 1)
for i, (image_name, image_reports) in enumerate(
self._provider.scan_per_image()
):
# Build resource UID from image name + SHA (all reports share the same SHA)
image_sha = image_reports[0].image_sha if image_reports else ""
resource_uid = (
f"{image_name}:{image_sha}" if image_sha else image_name
)
findings = []
for report in image_reports:
finding_uid = (
f"{report.check_metadata.CheckID}"
f"-{image_name}"
f"-{report.resource_id}"
)
status_enum = (
Status.FAIL if report.status == "FAIL" else Status.PASS
)
if report.muted:
status_enum = Status.MUTED
finding = Finding(
auth_method="Registry",
timestamp=datetime.datetime.now(timezone.utc),
account_uid=getattr(self._provider, "registry", None)
or "image",
account_name="Container Registry",
metadata=report.check_metadata,
uid=finding_uid,
status=status_enum,
status_extended=report.status_extended,
muted=report.muted,
resource_uid=resource_uid,
resource_metadata=report.resource,
resource_name=image_name,
resource_details=report.resource_details,
resource_tags={},
region=report.region,
compliance={},
raw=report.resource,
)
findings.append(finding)
if self._status:
findings = [f for f in findings if f.status in self._status]
self._number_of_checks_completed = i + 1
yield (self.progress, findings)
end_time = datetime.datetime.now()
self._duration = int((end_time - start_time).total_seconds())
return
for check_name in checks_to_execute:
try:
# Recover service from check name

View File

@@ -285,6 +285,9 @@ class Provider(ABC):
timeout=arguments.timeout,
config_path=arguments.config_file,
fixer_config=fixer_config,
registry_username=arguments.registry_username,
registry_password=arguments.registry_password,
registry_token=arguments.registry_token,
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(

View File

@@ -50,12 +50,40 @@ class ImageBaseException(ProwlerException):
"message": "Invalid image config scanner type.",
"remediation": "Use valid image config scanners: misconfig, secret.",
},
(11011, "ImageDockerLoginError"): {
"message": "Docker login failed for registry authentication.",
"remediation": "Check your registry credentials and ensure the registry is reachable.",
},
(11012, "ImageDockerNotFoundError"): {
"message": "Docker binary not found.",
"remediation": "Install Docker to enable private registry authentication via docker login.",
},
(11013, "ImageRegistryAuthError"): {
"message": "Registry authentication failed.",
"remediation": "Check REGISTRY_USERNAME/REGISTRY_PASSWORD or REGISTRY_TOKEN environment variables.",
},
(11014, "ImageRegistryCatalogError"): {
"message": "Registry does not support catalog listing.",
"remediation": "Use --image or --image-list instead of --registry.",
},
(11015, "ImageRegistryNetworkError"): {
"message": "Network error communicating with registry.",
"remediation": "Check registry URL and network connectivity.",
},
(11016, "ImageMaxImagesExceededError"): {
"message": "Discovered images exceed --max-images limit.",
"remediation": "Use --image-filter or --tag-filter to narrow results, or increase --max-images.",
},
(11017, "ImageInvalidFilterError"): {
"message": "Invalid regex filter pattern.",
"remediation": "Check the regex syntax for --image-filter or --tag-filter.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
error_info = self.IMAGE_ERROR_CODES.get((code, self.__class__.__name__))
if message:
error_info["message"] = message
if error_info and message:
error_info = {**error_info, "message": message}
super().__init__(
code,
source="Image",
@@ -162,3 +190,66 @@ class ImageInvalidConfigScannerError(ImageBaseException):
super().__init__(
11010, file=file, original_exception=original_exception, message=message
)
class ImageDockerLoginError(ImageBaseException):
"""Exception raised when Docker login fails."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11011, file=file, original_exception=original_exception, message=message
)
class ImageDockerNotFoundError(ImageBaseException):
"""Exception raised when Docker binary is not found."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11012, file=file, original_exception=original_exception, message=message
)
class ImageRegistryAuthError(ImageBaseException):
"""Exception raised when registry authentication fails."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11013, file=file, original_exception=original_exception, message=message
)
class ImageRegistryCatalogError(ImageBaseException):
"""Exception raised when registry does not support catalog listing."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11014, file=file, original_exception=original_exception, message=message
)
class ImageRegistryNetworkError(ImageBaseException):
"""Exception raised when a network error occurs communicating with a registry."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11015, file=file, original_exception=original_exception, message=message
)
class ImageMaxImagesExceededError(ImageBaseException):
"""Exception raised when discovered images exceed --max-images limit."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11016, file=file, original_exception=original_exception, message=message
)
class ImageInvalidFilterError(ImageBaseException):
"""Exception raised when an invalid regex filter pattern is provided."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
11017, file=file, original_exception=original_exception, message=message
)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import os
import re
import subprocess
import sys
@@ -21,12 +22,14 @@ from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageFindingProcessingError,
ImageInvalidConfigScannerError,
ImageInvalidFilterError,
ImageInvalidNameError,
ImageInvalidScannerError,
ImageInvalidSeverityError,
ImageInvalidTimeoutError,
ImageListFileNotFoundError,
ImageListFileReadError,
ImageMaxImagesExceededError,
ImageNoImagesProvidedError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
@@ -36,6 +39,8 @@ from prowler.providers.image.lib.arguments.arguments import (
SCANNERS_CHOICES,
SEVERITY_CHOICES,
)
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
from prowler.providers.image.lib.registry.factory import create_registry_adapter
class ImageProvider(Provider):
@@ -66,12 +71,23 @@ class ImageProvider(Provider):
config_path: str | None = None,
config_content: dict | None = None,
fixer_config: dict | None = None,
registry_username: str | None = None,
registry_password: str | None = None,
registry_token: str | None = None,
registry: str | None = None,
image_filter: str | None = None,
tag_filter: str | None = None,
max_images: int = 0,
registry_insecure: bool = False,
registry_list_images: bool = False,
):
logger.info("Instantiating Image Provider...")
self.images = images if images is not None else []
self.image_list_file = image_list_file
self.scanners = scanners if scanners is not None else ["vuln", "secret"]
self.scanners = (
scanners if scanners is not None else ["vuln", "secret", "misconfig"]
)
self.image_config_scanners = (
image_config_scanners if image_config_scanners is not None else []
)
@@ -82,7 +98,53 @@ class ImageProvider(Provider):
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
self._auth_method = "No auth"
self._listing_only = False
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
self.registry_username = registry_username or os.environ.get(
"REGISTRY_USERNAME"
)
self.registry_password = registry_password or os.environ.get(
"REGISTRY_PASSWORD"
)
self.registry_token = registry_token or os.environ.get("REGISTRY_TOKEN")
if self.registry_username and self.registry_password:
self._auth_method = "Docker login"
logger.info("Using docker login for registry authentication")
elif self.registry_token:
self._auth_method = "Registry token"
logger.info("Using registry token for authentication")
else:
self._auth_method = "No auth"
# Registry scan mode
self.registry = registry
self.image_filter = image_filter
self.tag_filter = tag_filter
self.max_images = max_images
self.registry_insecure = registry_insecure
self.registry_list_images = registry_list_images
# Compile regex filters
self._image_filter_re = None
self._tag_filter_re = None
if self.image_filter:
try:
self._image_filter_re = re.compile(self.image_filter)
except re.error as exc:
raise ImageInvalidFilterError(
file=__file__,
message=f"Invalid --image-filter regex '{self.image_filter}': {exc}",
)
if self.tag_filter:
try:
self._tag_filter_re = re.compile(self.tag_filter)
except re.error as exc:
raise ImageInvalidFilterError(
file=__file__,
message=f"Invalid --tag-filter regex '{self.tag_filter}': {exc}",
)
self._validate_inputs()
@@ -90,6 +152,12 @@ class ImageProvider(Provider):
if image_list_file:
self._load_images_from_file(image_list_file)
# Registry scan mode: enumerate images from registry
if self.registry:
self._enumerate_registry()
if self._listing_only:
return
for image in self.images:
self._validate_image_name(image)
@@ -245,37 +313,59 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
@staticmethod
def _extract_registry(image: str) -> str | None:
"""Extract registry hostname from an image reference.
Returns None for Docker Hub images (no registry prefix).
"""
parts = image.split("/")
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
return parts[0]
return None
def cleanup(self) -> None:
"""Clean up any resources after scanning."""
def _process_finding(
self, finding: dict, image_name: str, finding_type: str
self,
finding: dict,
image: str,
trivy_target: str,
image_sha: str = "",
) -> CheckReportImage:
"""
Process a single finding and create a CheckReportImage object.
Args:
finding: The finding object from Trivy output
image_name: The container image name being scanned
finding_type: The type of finding (Vulnerability, Secret, etc.)
image: The clean container image name (e.g., "alpine:3.18")
trivy_target: The Trivy target string (e.g., "alpine:3.18 (alpine 3.18.0)")
image_sha: Short SHA from Trivy Metadata.ImageID for resource uniqueness
Returns:
CheckReportImage: The processed check report
"""
try:
# Determine finding ID based on type
# Determine finding ID and category based on type
if "VulnerabilityID" in finding:
finding_id = finding["VulnerabilityID"]
finding_description = finding.get(
"Description", finding.get("Title", "")
)
finding_status = "FAIL"
finding_categories = ["vulnerability"]
elif "RuleID" in finding:
# Secret finding
finding_id = finding["RuleID"]
finding_description = finding.get("Title", "Secret detected")
finding_status = "FAIL"
finding_categories = ["secrets"]
else:
finding_id = finding.get("ID", "UNKNOWN")
finding_description = finding.get("Description", "")
finding_status = finding.get("Status", "FAIL")
finding_categories = []
# Build remediation text for vulnerabilities
remediation_text = ""
@@ -294,7 +384,7 @@ class ImageProvider(Provider):
"CheckID": finding_id,
"CheckTitle": finding.get("Title", finding_id),
"CheckType": ["Container Image Security"],
"ServiceName": finding_type,
"ServiceName": "container-image",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": trivy_severity,
@@ -304,7 +394,7 @@ class ImageProvider(Provider):
"Risk": finding.get(
"Description", "Vulnerability detected in container image"
),
"RelatedUrl": finding.get("PrimaryURL", ""),
"RelatedUrl": "",
"Remediation": {
"Code": {
"NativeIaC": "",
@@ -317,7 +407,7 @@ class ImageProvider(Provider):
"Url": finding.get("PrimaryURL", ""),
},
},
"Categories": [],
"Categories": finding_categories,
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
@@ -327,11 +417,13 @@ class ImageProvider(Provider):
metadata = json.dumps(metadata_dict)
report = CheckReportImage(
metadata=metadata, finding=finding, image_name=image_name
metadata=metadata, finding=finding, image_name=image
)
report.status = finding_status
report.status_extended = self._build_status_extended(finding)
report.region = self.region
report.image_sha = image_sha
report.resource_details = trivy_target
return report
except Exception as error:
@@ -368,10 +460,36 @@ class ImageProvider(Provider):
def run(self) -> list[CheckReportImage]:
"""Execute the container image scan."""
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
try:
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
finally:
self.cleanup()
def scan_per_image(
self,
) -> Generator[tuple[str, list[CheckReportImage]], None, None]:
"""Scan images one by one, yielding (image_name, findings) per image.
Unlike run() which returns all findings at once, this method yields
after each image completes, enabling progress tracking.
"""
try:
for image in self.images:
try:
image_findings = []
for batch in self._scan_single_image(image):
image_findings.extend(batch)
yield (image, image_findings)
except (ImageScanError, ImageTrivyBinaryNotFoundError):
raise
except Exception as error:
logger.error(f"Error scanning image {image}: {error}")
continue
finally:
self.cleanup()
def run_scan(self) -> Generator[list[CheckReportImage], None, None]:
"""
@@ -454,6 +572,19 @@ class ImageProvider(Provider):
logger.info(f"No findings for image: {image}")
return
# Extract image digest for resource uniqueness
trivy_metadata = output.get("Metadata", {})
image_id = trivy_metadata.get("ImageID", "")
if not image_id:
repo_digests = trivy_metadata.get("RepoDigests", [])
if repo_digests:
image_id = (
repo_digests[0].split("@")[-1]
if "@" in repo_digests[0]
else ""
)
short_sha = image_id.replace("sha256:", "")[:12] if image_id else ""
except json.JSONDecodeError as error:
logger.error(f"Failed to parse Trivy output for {image}: {error}")
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
@@ -464,11 +595,12 @@ class ImageProvider(Provider):
for result in results:
target = result.get("Target", image)
result_type = result.get("Type", "unknown")
# Process Vulnerabilities
for vuln in result.get("Vulnerabilities", []):
report = self._process_finding(vuln, target, result_type)
report = self._process_finding(
vuln, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -476,7 +608,9 @@ class ImageProvider(Provider):
# Process Secrets
for secret in result.get("Secrets", []):
report = self._process_finding(secret, target, "secret")
report = self._process_finding(
secret, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -485,7 +619,7 @@ class ImageProvider(Provider):
# Process Misconfigurations (from Dockerfile)
for misconfig in result.get("Misconfigurations", []):
report = self._process_finding(
misconfig, target, "misconfiguration"
misconfig, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
@@ -507,8 +641,19 @@ class ImageProvider(Provider):
)
logger.error(f"Error scanning image {image}: {error}")
def _build_trivy_env(self) -> dict:
"""Build environment variables for Trivy, injecting registry credentials."""
env = dict(os.environ)
if self.registry_username and self.registry_password:
env["TRIVY_USERNAME"] = self.registry_username
env["TRIVY_PASSWORD"] = self.registry_password
elif self.registry_token:
env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
return env
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
"""Execute Trivy command with optional progress bar."""
env = self._build_trivy_env()
try:
if sys.stdout.isatty():
with alive_bar(
@@ -523,6 +668,7 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
env=env,
)
bar.title = f"-> Scan completed for {image}"
return process
@@ -532,12 +678,13 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
env=env,
)
logger.info(f"Scan completed for {image}")
return process
except (AttributeError, OSError):
logger.info(f"Scanning {image}...")
return subprocess.run(command, capture_output=True, text=True)
return subprocess.run(command, capture_output=True, text=True, env=env)
def _log_trivy_stderr(self, stderr: str) -> None:
"""Parse and log Trivy's stderr output."""
@@ -596,6 +743,105 @@ class ImageProvider(Provider):
return error_msg
def _enumerate_registry(self) -> None:
"""Enumerate images from a registry using the appropriate adapter."""
verify_ssl = not self.registry_insecure
adapter = create_registry_adapter(
registry_url=self.registry,
username=self.registry_username,
password=self.registry_password,
token=self.registry_token,
verify_ssl=verify_ssl,
)
repositories = adapter.list_repositories()
logger.info(
f"Discovered {len(repositories)} repositories from registry {self.registry}"
)
# Apply image filter
if self._image_filter_re:
repositories = [r for r in repositories if self._image_filter_re.search(r)]
logger.info(
f"{len(repositories)} repositories match --image-filter '{self.image_filter}'"
)
if not repositories:
logger.warning(
f"No repositories found in registry {self.registry} (after filtering)"
)
return
# Determine if this is a Docker Hub adapter (for image reference format)
is_dockerhub = isinstance(adapter, DockerHubAdapter)
discovered_images = []
repos_tags: dict[str, list[str]] = {}
for repo in repositories:
tags = adapter.list_tags(repo)
# Apply tag filter
if self._tag_filter_re:
tags = [t for t in tags if self._tag_filter_re.search(t)]
if tags:
repos_tags[repo] = tags
for tag in tags:
if is_dockerhub:
# Docker Hub images don't need a host prefix
image_ref = f"{repo}:{tag}"
else:
# OCI registries need the full host/repo:tag reference
registry_host = self.registry.rstrip("/")
for prefix in ("https://", "http://"):
if registry_host.startswith(prefix):
registry_host = registry_host[len(prefix) :]
break
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
# Registry list mode: print listing and return early
if self.registry_list_images:
self._print_registry_listing(repos_tags, len(discovered_images))
self._listing_only = True
return
# Check max-images limit
if self.max_images and len(discovered_images) > self.max_images:
raise ImageMaxImagesExceededError(
file=__file__,
message=f"Discovered {len(discovered_images)} images, exceeding --max-images {self.max_images}. Use --image-filter or --tag-filter to narrow results.",
)
# Deduplicate with explicit images
existing = set(self.images)
for img in discovered_images:
if img not in existing:
self.images.append(img)
existing.add(img)
logger.info(
f"Discovered {len(discovered_images)} images from registry {self.registry} "
f"({len(repositories)} repositories). Total images to scan: {len(self.images)}"
)
def _print_registry_listing(
self, repos_tags: dict[str, list[str]], total_images: int
) -> None:
"""Print a structured listing of registry repositories and tags."""
num_repos = len(repos_tags)
print(
f"\n{Style.BRIGHT}Registry:{Style.RESET_ALL} "
f"{Fore.CYAN}{self.registry}{Style.RESET_ALL} "
f"({num_repos} {'repository' if num_repos == 1 else 'repositories'}, "
f"{total_images} {'image' if total_images == 1 else 'images'})\n"
)
for repo, tags in repos_tags.items():
print(f" {Fore.YELLOW}{repo}{Style.RESET_ALL} " f"({len(tags)} tags)")
print(f" {', '.join(tags)}")
print()
def print_credentials(self) -> None:
"""Print scan configuration."""
report_title = f"{Style.BRIGHT}Scanning container images:{Style.RESET_ALL}"
@@ -628,6 +874,23 @@ class ImageProvider(Provider):
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
report_lines.append(
f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
)
if self.registry:
report_lines.append(
f"Registry: {Fore.YELLOW}{self.registry}{Style.RESET_ALL}"
)
if self.image_filter:
report_lines.append(
f"Image filter: {Fore.YELLOW}{self.image_filter}{Style.RESET_ALL}"
)
if self.tag_filter:
report_lines.append(
f"Tag filter: {Fore.YELLOW}{self.tag_filter}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
@@ -635,25 +898,57 @@ class ImageProvider(Provider):
image: str | None = None,
raise_on_exception: bool = True,
provider_id: str | None = None,
registry_username: str | None = None,
registry_password: str | None = None,
registry_token: str | None = None,
registry: str | None = None,
) -> "Connection":
"""
Test connection to container registry by attempting to inspect an image.
Test connection to a container registry or image.
When ``registry`` is provided (and no ``image``), tests registry-level
reachability via the OCI catalog endpoint (list repositories).
Otherwise falls back to the Trivy-based single-image test.
Args:
image: Container image to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
registry_username: Registry username for basic auth
registry_password: Registry password for basic auth
registry_token: Registry token for token-based auth
registry: Registry URL for registry-level connectivity test
Returns:
Connection: Connection object with success status
"""
try:
# Registry-level test: list repositories to verify reachability
if registry and not image:
adapter = create_registry_adapter(
registry_url=registry,
username=registry_username,
password=registry_password,
token=registry_token,
)
adapter.list_repositories()
return Connection(is_connected=True)
# Image-level test via Trivy
if provider_id and not image:
image = provider_id
if not image:
return Connection(is_connected=False, error="Image name is required")
# Build env with registry credentials
env = dict(os.environ)
if registry_username and registry_password:
env["TRIVY_USERNAME"] = registry_username
env["TRIVY_PASSWORD"] = registry_password
elif registry_token:
env["TRIVY_REGISTRY_TOKEN"] = registry_token
# Test by running trivy with --skip-update to just test image access
process = subprocess.run(
[
@@ -666,6 +961,7 @@ class ImageProvider(Provider):
capture_output=True,
text=True,
timeout=60,
env=env,
)
if process.returncode == 0:

View File

@@ -88,16 +88,96 @@ def init_parser(self):
help="Trivy scan timeout. Default: 5m. Examples: 10m, 1h",
)
# Registry Scan Mode
registry_group = image_parser.add_argument_group("Registry Scan Mode")
registry_group.add_argument(
"--registry",
dest="registry",
default=None,
help="Registry URL to enumerate and scan all images. Examples: myregistry.io, docker.io/myorg, 123456789.dkr.ecr.us-east-1.amazonaws.com",
)
registry_group.add_argument(
"--image-filter",
dest="image_filter",
default=None,
help="Regex to filter repository names during registry enumeration (re.search). Example: '^prod/.*'",
)
registry_group.add_argument(
"--tag-filter",
dest="tag_filter",
default=None,
help=r"Regex to filter tags during registry enumeration (re.search). Example: '^(latest|v\d+\.\d+\.\d+)$'",
)
registry_group.add_argument(
"--max-images",
dest="max_images",
type=int,
default=0,
help="Maximum number of images to scan from registry. 0 = unlimited. Aborts if exceeded.",
)
registry_group.add_argument(
"--registry-insecure",
dest="registry_insecure",
action="store_true",
default=False,
help="Skip TLS verification for registry connections (for self-signed certificates).",
)
registry_group.add_argument(
"--registry-list",
dest="registry_list_images",
action="store_true",
default=False,
help="List all repositories and tags from the registry, then exit without scanning. Useful for discovering available images before building --image-filter or --tag-filter.",
)
def validate_arguments(arguments):
"""Validate Image provider arguments."""
images = getattr(arguments, "images", [])
image_list_file = getattr(arguments, "image_list_file", None)
registry = getattr(arguments, "registry", None)
image_filter = getattr(arguments, "image_filter", None)
tag_filter = getattr(arguments, "tag_filter", None)
max_images = getattr(arguments, "max_images", 0)
registry_insecure = getattr(arguments, "registry_insecure", False)
registry_list_images = getattr(arguments, "registry_list_images", False)
if not images and not image_list_file:
if registry_list_images and not registry:
return (False, "--registry-list requires --registry.")
if not images and not image_list_file and not registry:
return (
False,
"At least one image must be specified using --image (-I) or --image-list.",
"At least one image source must be specified using --image (-I), --image-list, or --registry.",
)
# Registry-only flags require --registry
if not registry:
if image_filter:
return (False, "--image-filter requires --registry.")
if tag_filter:
return (False, "--tag-filter requires --registry.")
if max_images:
return (False, "--max-images requires --registry.")
if registry_insecure:
return (False, "--registry-insecure requires --registry.")
# Docker Hub namespace validation
if registry:
url = registry.rstrip("/")
for prefix in ("https://", "http://"):
if url.startswith(prefix):
url = url[len(prefix) :]
break
stripped = url
for prefix in ("registry-1.docker.io", "docker.io"):
if stripped.startswith(prefix):
stripped = stripped[len(prefix) :].lstrip("/")
if not stripped:
return (
False,
"Docker Hub requires a namespace. Use --registry docker.io/{org_or_user}.",
)
break
return (True, "")

View File

@@ -0,0 +1,141 @@
"""Registry adapter abstract base class."""
from __future__ import annotations
import re
import time
from abc import ABC, abstractmethod
import requests
from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
_MAX_RETRIES = 3
_BACKOFF_BASE = 1
_USER_AGENT = f"Prowler/{prowler_version} (registry-adapter)"
class RegistryAdapter(ABC):
"""Abstract base class for registry adapters."""
def __init__(
self,
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
self.registry_url = registry_url
self.username = username
self._password = password
self._token = token
self.verify_ssl = verify_ssl
@property
def password(self) -> str | None:
return self._password
@property
def token(self) -> str | None:
return self._token
def __getstate__(self) -> dict:
state = self.__dict__.copy()
state["_password"] = "***" if state.get("_password") else None
state["_token"] = "***" if state.get("_token") else None
return state
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"registry_url={self.registry_url!r}, "
f"username={self.username!r}, "
f"password={'<redacted>' if self._password else None}, "
f"token={'<redacted>' if self._token else None})"
)
@abstractmethod
def list_repositories(self) -> list[str]:
"""Enumerate all repository names in the registry."""
...
@abstractmethod
def list_tags(self, repository: str) -> list[str]:
"""Enumerate all tags for a repository."""
...
def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
context_label = kwargs.pop("context_label", None) or self.registry_url
kwargs.setdefault("timeout", 30)
kwargs.setdefault("verify", self.verify_ssl)
headers = kwargs.get("headers", {})
headers.setdefault("User-Agent", _USER_AGENT)
kwargs["headers"] = headers
last_exception = None
last_status = None
last_body = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
resp = requests.request(method, url, **kwargs)
if resp.status_code == 429:
last_status = 429
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Rate limited by {context_label}, retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES})"
)
time.sleep(wait)
continue
if resp.status_code >= 500:
last_status = resp.status_code
last_body = (resp.text or "")[:500]
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Server error from {context_label} (HTTP {resp.status_code}), "
f"retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES}): {last_body}"
)
time.sleep(wait)
continue
return resp
except requests.exceptions.ConnectionError as exc:
last_exception = exc
if attempt < _MAX_RETRIES:
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Connection error to {context_label}, retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES})"
)
time.sleep(wait)
continue
except requests.exceptions.Timeout as exc:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Connection timed out to {context_label}.",
original_exception=exc,
)
if last_status == 429:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Rate limited by {context_label} after {_MAX_RETRIES} attempts.",
)
if last_status is not None and last_status >= 500:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Server error from {context_label} (HTTP {last_status}) after {_MAX_RETRIES} attempts: {last_body}",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Failed to connect to {context_label} after {_MAX_RETRIES} attempts.",
original_exception=last_exception,
)
@staticmethod
def _next_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if match:
return match.group(1)
return None

View File

@@ -0,0 +1,221 @@
"""Docker Hub registry adapter."""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.base import RegistryAdapter
if TYPE_CHECKING:
import requests
_HUB_API = "https://hub.docker.com"
_REGISTRY_HOST = "https://registry-1.docker.io"
_AUTH_URL = "https://auth.docker.io/token"
class DockerHubAdapter(RegistryAdapter):
"""Adapter for Docker Hub using the Hub REST API + OCI tag listing."""
def __init__(
self,
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
if not verify_ssl:
logger.warning(
"Docker Hub always uses TLS verification; --registry-insecure is ignored for Docker Hub registries."
)
super().__init__(registry_url, username, password, token, verify_ssl=True)
self.namespace = self._extract_namespace(registry_url)
self._hub_jwt: str | None = None
self._registry_tokens: dict[str, str] = {}
@staticmethod
def _extract_namespace(registry_url: str) -> str:
url = registry_url.rstrip("/")
for prefix in (
"https://registry-1.docker.io",
"http://registry-1.docker.io",
"https://docker.io",
"http://docker.io",
"registry-1.docker.io",
"docker.io",
"https://",
"http://",
):
if url.startswith(prefix):
url = url[len(prefix) :]
break
url = url.lstrip("/")
parts = url.split("/")
namespace = parts[0] if parts and parts[0] else ""
return namespace
def list_repositories(self) -> list[str]:
if not self.namespace:
raise ImageRegistryCatalogError(
file=__file__,
message="Docker Hub requires a namespace. Use --registry docker.io/{org_or_user}.",
)
self._hub_login()
repositories: list[str] = []
if self._hub_jwt:
url = f"{_HUB_API}/v2/namespaces/{self.namespace}/repositories"
else:
url = f"{_HUB_API}/v2/repositories/{self.namespace}/"
params: dict = {"page_size": 100}
while url:
resp = self._hub_request("GET", url, params=params)
self._check_hub_response(resp, "repository listing")
data = resp.json()
for repo in data.get("results", []):
name = repo.get("name", "")
if name:
repositories.append(f"{self.namespace}/{name}")
url = data.get("next")
params = {}
return repositories
def list_tags(self, repository: str) -> list[str]:
token = self._get_registry_token(repository)
tags: list[str] = []
url = f"{_REGISTRY_HOST}/v2/{repository}/tags/list"
params: dict = {"n": 100}
while url:
resp = self._registry_request("GET", url, token, params=params)
if resp.status_code in (401, 403):
raise ImageRegistryAuthError(
file=__file__,
message=f"Authentication failed for tag listing of {repository} on Docker Hub. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
if resp.status_code != 200:
logger.warning(
f"Failed to list tags for {repository} (HTTP {resp.status_code}): {resp.text[:200]}"
)
break
data = resp.json()
tags.extend(data.get("tags", []) or [])
url = self._next_tag_page_url(resp)
params = {}
return tags
def _hub_login(self) -> None:
if self._hub_jwt:
return
if not self.username or not self.password:
return
logger.debug(f"Docker Hub login attempt for username: {self.username!r}")
resp = self._request_with_retry(
"POST",
f"{_HUB_API}/v2/users/login",
json={"username": self.username, "password": self.password},
context_label="Docker Hub",
)
if resp.status_code != 200:
body_preview = resp.text[:200] if resp.text else "(empty body)"
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Docker Hub login failed (HTTP {resp.status_code}). "
f"Check REGISTRY_USERNAME and REGISTRY_PASSWORD. "
f"Response: {body_preview}"
),
)
self._hub_jwt = resp.json().get("token")
if not self._hub_jwt:
raise ImageRegistryAuthError(
file=__file__,
message="Docker Hub login returned an empty JWT token. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
def _get_registry_token(self, repository: str) -> str:
if repository in self._registry_tokens:
return self._registry_tokens[repository]
params = {
"service": "registry.docker.io",
"scope": f"repository:{repository}:pull",
}
auth = None
if self.username and self.password:
auth = (self.username, self.password)
resp = self._request_with_retry(
"GET",
_AUTH_URL,
params=params,
auth=auth,
context_label="Docker Hub",
)
if resp.status_code != 200:
raise ImageRegistryAuthError(
file=__file__,
message=f"Failed to obtain Docker Hub registry token for {repository} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
token = resp.json().get("token", "")
if not token:
raise ImageRegistryAuthError(
file=__file__,
message=f"Docker Hub registry token endpoint returned an empty token for {repository}. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
self._registry_tokens[repository] = token
return token
def _hub_request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
if self._hub_jwt:
headers["Authorization"] = f"Bearer {self._hub_jwt}"
kwargs["headers"] = headers
return self._request_with_retry(
method, url, context_label="Docker Hub", **kwargs
)
def _registry_request(
self, method: str, url: str, token: str, **kwargs
) -> requests.Response:
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {token}"
kwargs["headers"] = headers
return self._request_with_retry(
method, url, context_label="Docker Hub", **kwargs
)
def _check_hub_response(self, resp: requests.Response, context: str) -> None:
if resp.status_code == 200:
return
if resp.status_code in (401, 403):
raise ImageRegistryAuthError(
file=__file__,
message=f"Authentication failed for {context} on Docker Hub (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD environment variables.",
)
if resp.status_code == 404:
raise ImageRegistryCatalogError(
file=__file__,
message=f"Namespace '{self.namespace}' not found on Docker Hub. Check the namespace in --registry docker.io/{{namespace}}.",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Unexpected error during {context} on Docker Hub (HTTP {resp.status_code}): {resp.text[:200]}",
)
@staticmethod
def _next_tag_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if match:
next_url = match.group(1)
if next_url.startswith("/"):
return f"{_REGISTRY_HOST}{next_url}"
return next_url
return None

View File

@@ -0,0 +1,40 @@
"""Factory for auto-detecting registry type and returning the appropriate adapter."""
from __future__ import annotations
import re
from prowler.providers.image.lib.registry.base import RegistryAdapter
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
_DOCKER_HUB_PATTERN = re.compile(
r"^(https?://)?(docker\.io|registry-1\.docker\.io)(/|$)", re.IGNORECASE
)
def create_registry_adapter(
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> RegistryAdapter:
"""Auto-detect registry type from URL and return the appropriate adapter."""
if _DOCKER_HUB_PATTERN.search(registry_url):
return DockerHubAdapter(
registry_url=registry_url,
username=username,
password=password,
token=token,
verify_ssl=verify_ssl,
)
# ECR and other non-Docker-Hub registries implement the OCI Distribution Spec,
# so they are handled by the generic OCI adapter.
return OciRegistryAdapter(
registry_url=registry_url,
username=username,
password=password,
token=token,
verify_ssl=verify_ssl,
)

View File

@@ -0,0 +1,228 @@
"""Generic OCI Distribution Spec registry adapter."""
from __future__ import annotations
import base64
import ipaddress
import re
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.base import RegistryAdapter
if TYPE_CHECKING:
import requests
class OciRegistryAdapter(RegistryAdapter):
"""Adapter for registries implementing OCI Distribution Spec."""
def __init__(
self,
registry_url: str,
username: str | None = None,
password: str | None = None,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
super().__init__(registry_url, username, password, token, verify_ssl)
self._base_url = self._normalise_url(registry_url)
self._bearer_token: str | None = None
self._basic_auth_verified = False
@staticmethod
def _normalise_url(url: str) -> str:
url = url.rstrip("/")
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
return url
def list_repositories(self) -> list[str]:
self._ensure_auth()
repositories: list[str] = []
url = f"{self._base_url}/v2/_catalog"
params: dict = {"n": 200}
while url:
resp = self._authed_request("GET", url, params=params)
if resp.status_code == 404:
raise ImageRegistryCatalogError(
file=__file__,
message=f"Registry at {self.registry_url} does not support catalog listing (/_catalog returned 404). Use --image or --image-list instead.",
)
self._check_response(resp, "catalog listing")
data = resp.json()
repositories.extend(data.get("repositories", []))
url = self._next_page_url(resp)
params = {}
return repositories
def list_tags(self, repository: str) -> list[str]:
self._ensure_auth(repository=repository)
tags: list[str] = []
url = f"{self._base_url}/v2/{repository}/tags/list"
params: dict = {"n": 200}
while url:
resp = self._authed_request("GET", url, params=params)
self._check_response(resp, f"tag listing for {repository}")
data = resp.json()
tags.extend(data.get("tags", []) or [])
url = self._next_page_url(resp)
params = {}
return tags
def _ensure_auth(self, repository: str | None = None) -> None:
if self._bearer_token:
return
if self._basic_auth_verified:
return
if self.token:
self._bearer_token = self.token
return
ping_url = f"{self._base_url}/v2/"
resp = self._request_with_retry("GET", ping_url)
if resp.status_code == 200:
return
if resp.status_code == 401:
www_auth = resp.headers.get("Www-Authenticate", "")
if not www_auth.lower().startswith("bearer"):
# Basic auth challenge (e.g., AWS ECR)
if self.username and self.password:
self._basic_auth_verified = True
return
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Registry {self.registry_url} requires authentication "
f"but no credentials provided. "
f"Set REGISTRY_USERNAME and REGISTRY_PASSWORD."
),
)
# Bearer token exchange (standard OCI flow)
self._bearer_token = self._obtain_bearer_token(www_auth, repository)
return
if resp.status_code == 403:
raise ImageRegistryAuthError(
file=__file__,
message=f"Access denied to registry {self.registry_url} (HTTP 403). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Unexpected HTTP {resp.status_code} from registry {self.registry_url} during auth check.",
)
def _obtain_bearer_token(
self, www_authenticate: str, repository: str | None = None
) -> str:
match = re.search(r'realm="([^"]+)"', www_authenticate)
if not match:
raise ImageRegistryAuthError(
file=__file__,
message=f"Cannot parse token endpoint from registry {self.registry_url}. Www-Authenticate: {www_authenticate[:200]}",
)
realm = match.group(1)
self._validate_realm_url(realm)
params: dict = {}
service_match = re.search(r'service="([^"]+)"', www_authenticate)
if service_match:
params["service"] = service_match.group(1)
scope_match = re.search(r'scope="([^"]+)"', www_authenticate)
if scope_match:
params["scope"] = scope_match.group(1)
elif repository:
params["scope"] = f"repository:{repository}:pull"
auth = None
if self.username and self.password:
auth = (self.username, self.password)
resp = self._request_with_retry("GET", realm, params=params, auth=auth)
if resp.status_code != 200:
raise ImageRegistryAuthError(
file=__file__,
message=f"Failed to obtain bearer token from {realm} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
data = resp.json()
token = data.get("token") or data.get("access_token", "")
if not token:
raise ImageRegistryAuthError(
file=__file__,
message=f"Token endpoint {realm} returned an empty token. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
return token
@staticmethod
def _validate_realm_url(realm: str) -> None:
parsed = urlparse(realm)
if parsed.scheme not in ("http", "https"):
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm has disallowed scheme: {parsed.scheme}. Only http/https are allowed.",
)
if parsed.scheme == "http":
logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
hostname = parsed.hostname or ""
try:
addr = ipaddress.ip_address(hostname)
if addr.is_private or addr.is_loopback or addr.is_link_local:
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm points to a private/loopback address: {hostname}. This may indicate an SSRF attempt.",
)
except ValueError:
pass
def _resolve_basic_credentials(self) -> tuple[str | None, str | None]:
"""Decode pre-encoded base64 auth tokens (e.g., from aws ecr get-authorization-token).
Returns (username, password) — decoded if the password is a base64 token
containing 'username:real_password', otherwise returned as-is.
"""
if not self.password:
return self.username, self.password
try:
decoded = base64.b64decode(self.password).decode("utf-8")
if decoded.startswith(f"{self.username}:"):
return self.username, decoded[len(self.username) + 1 :]
except (ValueError, UnicodeDecodeError):
logger.debug("Password is not a base64-encoded auth token, using as-is")
return self.username, self.password
def _authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
resp = self._do_authed_request(method, url, **kwargs)
if resp.status_code == 401 and self._bearer_token:
logger.debug(
f"Bearer token rejected (HTTP 401), re-authenticating to {self.registry_url}"
)
self._bearer_token = None
self._ensure_auth()
resp = self._do_authed_request(method, url, **kwargs)
return resp
def _do_authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
if self._bearer_token:
headers["Authorization"] = f"Bearer {self._bearer_token}"
elif self.username and self.password:
user, pwd = self._resolve_basic_credentials()
kwargs.setdefault("auth", (user, pwd))
kwargs["headers"] = headers
return self._request_with_retry(method, url, **kwargs)
def _check_response(self, resp: requests.Response, context: str) -> None:
if resp.status_code == 200:
return
if resp.status_code in (401, 403):
raise ImageRegistryAuthError(
file=__file__,
message=f"Authentication failed for {context} on {self.registry_url} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Unexpected error during {context} on {self.registry_url} (HTTP {resp.status_code}): {resp.text[:200]}",
)

View File

@@ -45,8 +45,16 @@ SAMPLE_UNKNOWN_SEVERITY_FINDING = {
"Description": "An issue with unknown severity.",
}
# Sample image SHA for testing (first 12 chars of a sha256 digest)
SAMPLE_IMAGE_SHA = "c1aabb73d233"
SAMPLE_IMAGE_ID = f"sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"
# Full Trivy JSON output structure with a single vulnerability
SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Metadata": {
"ImageID": SAMPLE_IMAGE_ID,
"RepoDigests": [f"alpine@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
},
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
@@ -55,11 +63,15 @@ SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Secrets": [],
"Misconfigurations": [],
}
]
],
}
# Full Trivy JSON output with mixed finding types
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Metadata": {
"ImageID": SAMPLE_IMAGE_ID,
"RepoDigests": [f"myimage@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
},
"Results": [
{
"Target": "myimage:latest (debian 12)",
@@ -68,7 +80,36 @@ SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Secrets": [SAMPLE_SECRET_FINDING],
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
}
]
],
}
# Trivy output with only RepoDigests (no ImageID) for fallback testing
SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT = {
"Metadata": {
"RepoDigests": ["alpine@sha256:e5f6g7h8i9j0abcdef1234567890"],
},
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
],
}
# Trivy output with no Metadata at all
SAMPLE_TRIVY_NO_METADATA_OUTPUT = {
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
],
}
@@ -90,3 +131,13 @@ def get_invalid_trivy_output():
def get_multi_type_trivy_output():
"""Return Trivy output with multiple finding types as string."""
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
def get_repo_digest_only_trivy_output():
"""Return Trivy output with only RepoDigests (no ImageID) as string."""
return json.dumps(SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT)
def get_no_metadata_trivy_output():
"""Return Trivy output with no Metadata as string."""
return json.dumps(SAMPLE_TRIVY_NO_METADATA_OUTPUT)

View File

@@ -1,3 +1,4 @@
import os
import tempfile
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -19,6 +20,7 @@ from prowler.providers.image.exceptions.exceptions import (
)
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
SAMPLE_IMAGE_SHA,
SAMPLE_MISCONFIGURATION_FINDING,
SAMPLE_SECRET_FINDING,
SAMPLE_UNKNOWN_SEVERITY_FINDING,
@@ -26,6 +28,8 @@ from tests.providers.image.image_fixtures import (
get_empty_trivy_output,
get_invalid_trivy_output,
get_multi_type_trivy_output,
get_no_metadata_trivy_output,
get_repo_digest_only_trivy_output,
get_sample_trivy_json_output,
)
@@ -48,7 +52,7 @@ class TestImageProvider:
assert provider._type == "image"
assert provider.type == "image"
assert provider.images == ["alpine:3.18"]
assert provider.scanners == ["vuln", "secret"]
assert provider.scanners == ["vuln", "secret", "misconfig"]
assert provider.image_config_scanners == []
assert provider.trivy_severity == []
assert provider.ignore_unfixed is False
@@ -119,22 +123,27 @@ class TestImageProvider:
provider = _make_provider()
report = provider._process_finding(
SAMPLE_VULNERABILITY_FINDING,
"alpine:3.18",
"alpine:3.18 (alpine 3.18.0)",
"alpine",
image_sha="c1aabb73d233",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "CVE-2024-1234"
assert report.check_metadata.Severity == "high"
assert report.check_metadata.ServiceName == "alpine"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.ResourceType == "container-image"
assert report.check_metadata.ResourceGroup == "container"
assert report.package_name == "openssl"
assert report.installed_version == "1.1.1k-r0"
assert report.fixed_version == "1.1.1l-r0"
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
assert report.resource_name == "alpine:3.18"
assert report.image_sha == "c1aabb73d233"
assert report.resource_details == "alpine:3.18 (alpine 3.18.0)"
assert report.region == "container"
assert report.check_metadata.Categories == ["vulnerability"]
assert report.check_metadata.RelatedUrl == ""
def test_process_finding_secret(self):
"""Test processing a secret finding (identified by RuleID)."""
@@ -142,14 +151,15 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_SECRET_FINDING,
"myimage:latest",
"secret",
"myimage:latest (debian 12)",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "aws-access-key-id"
assert report.check_metadata.Severity == "critical"
assert report.check_metadata.ServiceName == "secret"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.Categories == ["secrets"]
def test_process_finding_misconfiguration(self):
"""Test processing a misconfiguration finding (identified by ID)."""
@@ -157,13 +167,14 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_MISCONFIGURATION_FINDING,
"myimage:latest",
"misconfiguration",
"myimage:latest (debian 12)",
)
assert isinstance(report, CheckReportImage)
assert report.check_metadata.CheckID == "DS001"
assert report.check_metadata.Severity == "medium"
assert report.check_metadata.ServiceName == "misconfiguration"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.Categories == []
def test_process_finding_unknown_severity(self):
"""Test that UNKNOWN severity is mapped to informational."""
@@ -171,7 +182,7 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_UNKNOWN_SEVERITY_FINDING,
"myimage:latest",
"alpine",
"myimage:latest (alpine 3.18.0)",
)
assert report.check_metadata.Severity == "informational"
@@ -190,6 +201,9 @@ class TestImageProvider:
assert len(reports) == 1
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
assert reports[0].resource_name == "alpine:3.18"
assert reports[0].check_metadata.ServiceName == "container-image"
@patch("subprocess.run")
def test_run_scan_empty_output(self, mock_subprocess):
@@ -389,6 +403,51 @@ class TestImageProvider:
for _ in provider._scan_single_image("private/image:latest"):
pass
@patch("subprocess.run")
def test_sha_extraction_from_image_id(self, mock_subprocess):
"""Test that image_sha is extracted from Trivy Metadata.ImageID."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
@patch("subprocess.run")
def test_sha_extraction_fallback_to_repo_digests(self, mock_subprocess):
"""Test that image_sha falls back to RepoDigests when ImageID is absent."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_repo_digest_only_trivy_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == "e5f6g7h8i9j0"
@patch("subprocess.run")
def test_sha_extraction_no_metadata(self, mock_subprocess):
"""Test that image_sha is empty when no Metadata is present."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_no_metadata_trivy_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == ""
@patch("subprocess.run")
def test_run_scan_propagates_scan_error(self, mock_subprocess):
"""Test that run_scan() re-raises ImageScanError instead of swallowing it."""
@@ -593,3 +652,351 @@ class TestImageProviderNameValidation:
with pytest.raises(ImageListFileReadError):
_make_provider(images=None, image_list_file=file_path)
class TestImageProviderRegistryAuth:
def test_no_auth_by_default(self):
"""Test that no auth is set when no credentials are provided."""
provider = _make_provider()
assert provider.registry_username is None
assert provider.registry_password is None
assert provider.registry_token is None
assert provider.auth_method == "No auth"
def test_basic_auth_with_explicit_params(self):
"""Test basic auth via explicit constructor params."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
assert provider.registry_username == "myuser"
assert provider.registry_password == "mypass"
assert provider.auth_method == "Docker login"
def test_token_auth_with_explicit_param(self):
"""Test token auth via explicit constructor param."""
provider = _make_provider(registry_token="my-token-123")
assert provider.registry_token == "my-token-123"
assert provider.auth_method == "Registry token"
def test_basic_auth_takes_precedence_over_token(self):
"""Test that username/password takes precedence over token."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
registry_token="my-token",
)
assert provider.auth_method == "Docker login"
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
)
def test_basic_auth_from_env_vars(self):
"""Test that env vars are used as fallback for basic auth."""
provider = _make_provider()
assert provider.registry_username == "envuser"
assert provider.registry_password == "envpass"
assert provider.auth_method == "Docker login"
@patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
def test_token_auth_from_env_var(self):
"""Test that env var is used as fallback for token auth."""
provider = _make_provider()
assert provider.registry_token == "env-token"
assert provider.auth_method == "Registry token"
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
)
def test_explicit_params_override_env_vars(self):
"""Test that explicit params take precedence over env vars."""
provider = _make_provider(
registry_username="explicit",
registry_password="explicit-pass",
)
assert provider.registry_username == "explicit"
assert provider.registry_password == "explicit-pass"
def test_build_trivy_env_no_auth(self):
"""Test that _build_trivy_env returns base env when no auth."""
provider = _make_provider()
env = provider._build_trivy_env()
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
assert "TRIVY_REGISTRY_TOKEN" not in env
def test_build_trivy_env_basic_auth_sets_env_vars(self):
"""Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
env = provider._build_trivy_env()
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
def test_build_trivy_env_token_auth(self):
"""Test that _build_trivy_env injects registry token."""
provider = _make_provider(registry_token="my-token")
env = provider._build_trivy_env()
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
def test_execute_trivy_sets_trivy_env_with_basic_auth(self, mock_subprocess):
"""Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("subprocess.run")
def test_test_connection_with_basic_auth(self, mock_subprocess):
"""Test test_connection uses Trivy native auth with TRIVY_USERNAME/PASSWORD env vars."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_username="myuser",
registry_password="mypass",
)
assert result.is_connected is True
assert mock_subprocess.call_count == 1
trivy_call = mock_subprocess.call_args
assert trivy_call.args[0][0] == "trivy"
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("subprocess.run")
def test_test_connection_with_token(self, mock_subprocess):
"""Test test_connection passes token via env."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_token="my-token",
)
assert result.is_connected is True
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_success(self, mock_factory):
"""Test registry-level connection test via list_repositories."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["repo1", "repo2"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
registry="https://myregistry.io",
registry_username="user",
registry_password="pass",
)
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="https://myregistry.io",
username="user",
password="pass",
token=None,
)
mock_adapter.list_repositories.assert_called_once()
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_failure(self, mock_factory):
"""Test registry-level connection test failure."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.side_effect = Exception("401 Unauthorized")
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
registry="https://myregistry.io",
raise_on_exception=False,
)
assert result.is_connected is False
assert "401 Unauthorized" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_with_token(self, mock_factory):
"""Test registry-level connection test with token auth."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["repo1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
registry="https://ghcr.io",
registry_token="my-token",
)
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="https://ghcr.io",
username=None,
password=None,
token="my-token",
)
@patch("subprocess.run")
def test_test_connection_image_takes_precedence_over_registry(
self, mock_subprocess
):
"""When both image and registry are provided, image-level Trivy test is used."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="alpine:3.18",
registry="https://myregistry.io",
)
assert result.is_connected is True
mock_subprocess.assert_called_once()
def test_print_credentials_shows_auth_method(self):
"""Test that print_credentials outputs the auth method."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with mock.patch("builtins.print") as mock_print:
provider.print_credentials()
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
assert "Docker login" in output
class TestExtractRegistry:
def test_docker_hub_simple(self):
assert ImageProvider._extract_registry("alpine:3.18") is None
def test_docker_hub_with_namespace(self):
assert ImageProvider._extract_registry("andoniaf/test-private:tag") is None
def test_ghcr(self):
assert ImageProvider._extract_registry("ghcr.io/user/image:tag") == "ghcr.io"
def test_ecr(self):
assert (
ImageProvider._extract_registry(
"123456789012.dkr.ecr.us-east-1.amazonaws.com/repo:tag"
)
== "123456789012.dkr.ecr.us-east-1.amazonaws.com"
)
def test_localhost_with_port(self):
assert (
ImageProvider._extract_registry("localhost:5000/myimage:latest")
== "localhost:5000"
)
def test_custom_registry_with_port(self):
assert (
ImageProvider._extract_registry("myregistry.io:5000/image:tag")
== "myregistry.io:5000"
)
def test_digest_reference(self):
assert (
ImageProvider._extract_registry("ghcr.io/user/image@sha256:abc123")
== "ghcr.io"
)
def test_bare_image_name(self):
assert ImageProvider._extract_registry("nginx") is None
class TestCleanup:
def test_cleanup_idempotent(self):
"""Test cleanup is safe to call multiple times."""
provider = _make_provider()
provider.cleanup()
provider.cleanup()
class TestScanPerImage:
@patch("subprocess.run")
def test_yields_per_image(self, mock_subprocess):
"""Test that scan_per_image yields (name, findings) per image."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
results = list(provider.scan_per_image())
assert len(results) == 2
for name, findings in results:
assert isinstance(name, str)
assert isinstance(findings, list)
assert all(isinstance(f, CheckReportImage) for f in findings)
@patch("subprocess.run")
def test_reraises_scan_error(self, mock_subprocess):
"""Test that ImageScanError propagates from scan_per_image."""
mock_subprocess.return_value = MagicMock(
returncode=1, stdout="", stderr="scan failed"
)
provider = _make_provider(images=["alpine:3.18"])
with pytest.raises(ImageScanError):
list(provider.scan_per_image())
@patch("subprocess.run")
def test_skips_generic_error(self, mock_subprocess):
"""Test that a generic RuntimeError in _scan_single_image yields empty findings and continues."""
def side_effect(cmd, **kwargs):
if "bad:image" in cmd:
raise RuntimeError("unexpected error")
return MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
mock_subprocess.side_effect = side_effect
provider = _make_provider(images=["bad:image", "alpine:3.18"])
results = list(provider.scan_per_image())
assert len(results) == 2
assert results[0][0] == "bad:image"
assert results[0][1] == []
assert results[1][0] == "alpine:3.18"
assert len(results[1][1]) > 0
@patch("subprocess.run")
def test_calls_cleanup(self, mock_subprocess):
"""Test that cleanup is called even after scan_per_image completes."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(images=["alpine:3.18"])
with mock.patch.object(provider, "cleanup") as mock_cleanup:
list(provider.scan_per_image())
mock_cleanup.assert_called_once()

View File

View File

@@ -0,0 +1,223 @@
from argparse import Namespace
from prowler.providers.image.lib.arguments.arguments import validate_arguments
class TestValidateArguments:
def test_no_source_fails(self):
args = Namespace(
images=[],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--image" in msg
def test_image_only_passes(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_image_list_only_passes(self):
args = Namespace(
images=[],
image_list_file="images.txt",
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_registry_only_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_image_filter_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter="^prod",
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--image-filter requires --registry" in msg
def test_tag_filter_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter="^v",
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--tag-filter requires --registry" in msg
def test_max_images_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=50,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--max-images requires --registry" in msg
def test_registry_insecure_without_registry_fails(self):
args = Namespace(
images=[],
image_list_file="i.txt",
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=True,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--registry-insecure requires --registry" in msg
def test_docker_hub_no_namespace_fails(self):
args = Namespace(
images=[],
image_list_file=None,
registry="docker.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "namespace" in msg.lower()
def test_docker_hub_with_namespace_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="docker.io/myorg",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_docker_hub_https_no_namespace_fails(self):
args = Namespace(
images=[],
image_list_file=None,
registry="https://docker.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, msg = validate_arguments(args)
assert not ok
assert "namespace" in msg.lower()
def test_registry_with_filters_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="myregistry.io",
image_filter="^prod",
tag_filter="^v",
max_images=100,
registry_insecure=True,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok
def test_registry_list_without_registry_fails(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry=None,
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=True,
)
ok, msg = validate_arguments(args)
assert not ok
assert "--registry-list requires --registry" in msg
def test_registry_list_with_registry_passes(self):
args = Namespace(
images=[],
image_list_file=None,
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=True,
)
ok, _ = validate_arguments(args)
assert ok
def test_combined_registry_and_image_passes(self):
args = Namespace(
images=["nginx:latest"],
image_list_file=None,
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
)
ok, _ = validate_arguments(args)
assert ok

View File

@@ -0,0 +1,243 @@
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
class TestDockerHubAdapterInit:
def test_extract_namespace_simple(self):
assert DockerHubAdapter._extract_namespace("docker.io/myorg") == "myorg"
def test_extract_namespace_https(self):
assert DockerHubAdapter._extract_namespace("https://docker.io/myorg") == "myorg"
def test_extract_namespace_registry1(self):
assert (
DockerHubAdapter._extract_namespace("registry-1.docker.io/myorg") == "myorg"
)
def test_extract_namespace_empty(self):
assert DockerHubAdapter._extract_namespace("docker.io") == ""
def test_extract_namespace_with_slash(self):
assert DockerHubAdapter._extract_namespace("docker.io/myorg/") == "myorg"
class TestDockerHubListRepositories:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos(self, mock_request):
# Hub login (now goes through requests.request via _request_with_retry)
login_resp = MagicMock(status_code=200)
login_resp.json.return_value = {"token": "jwt"}
# Repo listing
repos_resp = MagicMock(status_code=200)
repos_resp.json.return_value = {
"results": [{"name": "app1"}, {"name": "app2"}],
"next": None,
}
mock_request.side_effect = [login_resp, repos_resp]
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
repos = adapter.list_repositories()
assert repos == ["myorg/app1", "myorg/app2"]
def test_list_repos_no_namespace_raises(self):
adapter = DockerHubAdapter("docker.io")
with pytest.raises(ImageRegistryCatalogError, match="namespace"):
adapter.list_repositories()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_public_no_credentials(self, mock_request):
"""When no credentials are provided, use the public /v2/repositories/{ns}/ endpoint."""
repos_resp = MagicMock(status_code=200)
repos_resp.json.return_value = {
"results": [{"name": "repo1"}, {"name": "repo2"}],
"next": None,
}
mock_request.return_value = repos_resp
adapter = DockerHubAdapter("docker.io/publicns")
repos = adapter.list_repositories()
assert repos == ["publicns/repo1", "publicns/repo2"]
called_url = mock_request.call_args[0][1]
assert "/v2/repositories/publicns/" in called_url
assert "/v2/namespaces/" not in called_url
class TestDockerHubListTags:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags(self, mock_request):
# Token exchange (now goes through requests.request via _request_with_retry)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "registry-token"}
# Tag listing
tags_resp = MagicMock(status_code=200, headers={})
tags_resp.json.return_value = {"tags": ["latest", "v1.0"]}
mock_request.side_effect = [token_resp, tags_resp]
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
tags = adapter.list_tags("myorg/myapp")
assert tags == ["latest", "v1.0"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags_auth_failure(self, mock_request):
# Token exchange
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "tok"}
# Tag listing returns 401
tags_resp = MagicMock(status_code=401)
mock_request.side_effect = [token_resp, tags_resp]
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryAuthError):
adapter.list_tags("myorg/myapp")
class TestDockerHubLogin:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_failure(self, mock_request):
resp = MagicMock(status_code=401, text="invalid credentials")
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="bad", password="creds")
with pytest.raises(ImageRegistryAuthError, match="login failed"):
adapter._hub_login()
def test_login_skipped_without_credentials(self):
adapter = DockerHubAdapter("docker.io/myorg")
adapter._hub_login() # Should not raise
assert adapter._hub_jwt is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_401_includes_response_body(self, mock_request):
resp = MagicMock(
status_code=401, text='{"detail":"Incorrect authentication credentials"}'
)
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(
ImageRegistryAuthError, match="Incorrect authentication credentials"
):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_500_retried_then_raises_network_error(
self, mock_request, mock_sleep
):
mock_request.return_value = MagicMock(status_code=500)
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryNetworkError, match="Server error"):
adapter._hub_login()
assert mock_request.call_count == 3
class TestDockerHubRetry:
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_429(self, mock_request, mock_sleep):
resp_429 = MagicMock(status_code=429)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_429, resp_200]
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry(
"GET", "https://hub.docker.com/v2/namespaces/myorg/repositories"
)
assert result.status_code == 200
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_connection_error_retries(self, mock_request, mock_sleep):
mock_request.side_effect = requests.exceptions.ConnectionError("fail")
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryNetworkError):
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_500(self, mock_request, mock_sleep):
resp_500 = MagicMock(status_code=500)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_500, resp_200]
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry("GET", "https://hub.docker.com")
assert result.status_code == 200
assert mock_request.call_count == 2
mock_sleep.assert_called_once()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_exhausted_on_500_raises_network_error(
self, mock_request, mock_sleep
):
mock_request.return_value = MagicMock(status_code=500)
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(
ImageRegistryNetworkError, match="Server error.*HTTP 500.*3 attempts"
):
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_4xx_not_retried(self, mock_request, mock_sleep):
mock_request.return_value = MagicMock(status_code=403)
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry("GET", "https://hub.docker.com")
assert result.status_code == 403
assert mock_request.call_count == 1
mock_sleep.assert_not_called()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_request_sends_user_agent(self, mock_request):
mock_request.return_value = MagicMock(status_code=200)
adapter = DockerHubAdapter("docker.io/myorg")
adapter._request_with_retry("GET", "https://hub.docker.com")
_, kwargs = mock_request.call_args
from prowler.config.config import prowler_version
assert (
kwargs["headers"]["User-Agent"]
== f"Prowler/{prowler_version} (registry-adapter)"
)
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_500_includes_response_body(self, mock_request, mock_sleep):
resp_500 = MagicMock(status_code=500, text="<html>Cloudflare error</html>")
mock_request.return_value = resp_500
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryNetworkError, match="Cloudflare error"):
adapter._request_with_retry("GET", "https://hub.docker.com")
class TestDockerHubEmptyTokens:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_empty_hub_jwt_raises(self, mock_request):
resp = MagicMock(status_code=200)
resp.json.return_value = {"token": ""}
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty JWT"):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_none_hub_jwt_raises(self, mock_request):
resp = MagicMock(status_code=200)
resp.json.return_value = {}
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty JWT"):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_empty_registry_token_raises(self, mock_request):
resp = MagicMock(status_code=200)
resp.json.return_value = {"token": ""}
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._get_registry_token("myorg/myapp")

View File

@@ -0,0 +1,34 @@
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
from prowler.providers.image.lib.registry.factory import create_registry_adapter
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
class TestCreateRegistryAdapter:
def test_docker_hub_returns_dockerhub_adapter(self):
adapter = create_registry_adapter("docker.io/myorg")
assert isinstance(adapter, DockerHubAdapter)
def test_oci_returns_oci_adapter(self):
adapter = create_registry_adapter("myregistry.io")
assert isinstance(adapter, OciRegistryAdapter)
def test_ecr_returns_oci_adapter(self):
adapter = create_registry_adapter("123456789.dkr.ecr.us-east-1.amazonaws.com")
assert isinstance(adapter, OciRegistryAdapter)
def test_passes_credentials(self):
adapter = create_registry_adapter(
"myregistry.io",
username="user",
password="pass",
token="tok",
verify_ssl=False,
)
assert adapter.username == "user"
assert adapter.password == "pass"
assert adapter.token == "tok"
assert adapter.verify_ssl is False
def test_registry_1_docker_io(self):
adapter = create_registry_adapter("registry-1.docker.io/myorg")
assert isinstance(adapter, DockerHubAdapter)

View File

@@ -0,0 +1,418 @@
import base64
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
)
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
class TestOciAdapterInit:
def test_normalise_url_adds_https(self):
adapter = OciRegistryAdapter("myregistry.io")
assert adapter._base_url == "https://myregistry.io"
def test_normalise_url_keeps_http(self):
adapter = OciRegistryAdapter("http://myregistry.io")
assert adapter._base_url == "http://myregistry.io"
def test_normalise_url_strips_trailing_slash(self):
adapter = OciRegistryAdapter("https://myregistry.io/")
assert adapter._base_url == "https://myregistry.io"
def test_stores_credentials(self):
adapter = OciRegistryAdapter(
"reg.io", username="u", password="p", token="t", verify_ssl=False
)
assert adapter.username == "u"
assert adapter.password == "p"
assert adapter.token == "t"
assert adapter.verify_ssl is False
class TestOciAdapterAuth:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_with_token(self, mock_request):
adapter = OciRegistryAdapter("reg.io", token="my-token")
adapter._ensure_auth()
assert adapter._bearer_token == "my-token"
mock_request.assert_not_called()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_anonymous_ok(self, mock_request):
resp = MagicMock(status_code=200)
mock_request.return_value = resp
adapter = OciRegistryAdapter("reg.io")
adapter._ensure_auth()
assert adapter._bearer_token is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_bearer_challenge(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "bearer-tok"}
mock_request.side_effect = [ping_resp, token_resp]
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
adapter._ensure_auth()
assert adapter._bearer_token == "bearer-tok"
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_403_raises(self, mock_request):
resp = MagicMock(status_code=403)
mock_request.return_value = resp
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError):
adapter._ensure_auth()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_basic_challenge_with_creds(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
mock_request.return_value = ping_resp
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="tok")
adapter._ensure_auth()
assert adapter._basic_auth_verified is True
assert adapter._bearer_token is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_ensure_auth_basic_challenge_no_creds(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
mock_request.return_value = ping_resp
adapter = OciRegistryAdapter("ecr.aws")
with pytest.raises(ImageRegistryAuthError):
adapter._ensure_auth()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_basic_auth_used_in_requests(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
catalog_resp = MagicMock(status_code=200, headers={})
catalog_resp.json.return_value = {"repositories": ["myapp"]}
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="tok")
adapter._ensure_auth()
adapter._authed_request("GET", "https://ecr.aws/v2/_catalog")
# The catalog request should use Basic auth (auth kwarg), not Bearer header
call_kwargs = mock_request.call_args_list[1][1]
assert call_kwargs.get("auth") == ("AWS", "tok")
assert "Authorization" not in call_kwargs.get("headers", {})
def test_resolve_basic_credentials_decodes_base64_token(self):
raw_password = "real-jwt-password"
encoded = base64.b64encode(f"AWS:{raw_password}".encode()).decode()
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=encoded)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == raw_password
def test_resolve_basic_credentials_passthrough_raw_password(self):
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="plain-pass")
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == "plain-pass"
def test_resolve_basic_credentials_passthrough_invalid_base64(self):
adapter = OciRegistryAdapter(
"ecr.aws", username="AWS", password="not!valid~base64"
)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == "not!valid~base64"
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_basic_auth_decodes_ecr_token_in_request(self, mock_request):
raw_password = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.abc"
encoded = base64.b64encode(f"AWS:{raw_password}".encode()).decode()
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
)
catalog_resp = MagicMock(status_code=200, headers={})
catalog_resp.json.return_value = {"repositories": ["myapp"]}
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=encoded)
adapter._ensure_auth()
adapter._authed_request("GET", "https://ecr.aws/v2/_catalog")
call_kwargs = mock_request.call_args_list[1][1]
assert call_kwargs.get("auth") == ("AWS", raw_password)
def test_resolve_basic_credentials_none_password(self):
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=None)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_authed_request_retries_on_401_with_bearer(self, mock_request):
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
adapter._bearer_token = "expired-token"
# First request: 401 (expired token)
resp_401 = MagicMock(status_code=401)
# _ensure_auth ping: 401 with bearer challenge
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.reg.io/token",service="registry"'
},
)
# Token exchange: success
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "new-token"}
# Second request: 200 (new token works)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_401, ping_resp, token_resp, resp_200]
result = adapter._authed_request("GET", "https://reg.io/v2/myapp/tags/list")
assert result.status_code == 200
assert adapter._bearer_token == "new-token"
assert mock_request.call_count == 4
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_authed_request_no_retry_on_401_without_bearer(self, mock_request):
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
adapter._basic_auth_verified = True
# No bearer token — using basic auth
resp_401 = MagicMock(status_code=401)
mock_request.return_value = resp_401
result = adapter._authed_request("GET", "https://reg.io/v2/_catalog")
assert result.status_code == 401
# Should only be called once (no retry for basic auth)
assert mock_request.call_count == 1
class TestOciAdapterListRepositories:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_single_page(self, mock_request):
ping_resp = MagicMock(status_code=200)
catalog_resp = MagicMock(status_code=200, headers={})
catalog_resp.json.return_value = {
"repositories": ["app/frontend", "app/backend"]
}
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("reg.io")
repos = adapter.list_repositories()
assert repos == ["app/frontend", "app/backend"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_paginated(self, mock_request):
ping_resp = MagicMock(status_code=200)
page1_resp = MagicMock(
status_code=200,
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'},
)
page1_resp.json.return_value = {"repositories": ["a"]}
page2_resp = MagicMock(status_code=200, headers={})
page2_resp.json.return_value = {"repositories": ["b"]}
mock_request.side_effect = [ping_resp, page1_resp, page2_resp]
adapter = OciRegistryAdapter("reg.io")
repos = adapter.list_repositories()
assert repos == ["a", "b"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_repos_404_raises(self, mock_request):
ping_resp = MagicMock(status_code=200)
catalog_resp = MagicMock(status_code=404)
mock_request.side_effect = [ping_resp, catalog_resp]
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryCatalogError):
adapter.list_repositories()
class TestOciAdapterListTags:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags(self, mock_request):
ping_resp = MagicMock(status_code=200)
tags_resp = MagicMock(status_code=200, headers={})
tags_resp.json.return_value = {"tags": ["latest", "v1.0"]}
mock_request.side_effect = [ping_resp, tags_resp]
adapter = OciRegistryAdapter("reg.io")
tags = adapter.list_tags("myapp")
assert tags == ["latest", "v1.0"]
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_list_tags_null_tags(self, mock_request):
ping_resp = MagicMock(status_code=200)
tags_resp = MagicMock(status_code=200, headers={})
tags_resp.json.return_value = {"tags": None}
mock_request.side_effect = [ping_resp, tags_resp]
adapter = OciRegistryAdapter("reg.io")
tags = adapter.list_tags("myapp")
assert tags == []
class TestOciAdapterRetry:
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_429(self, mock_request, mock_sleep):
resp_429 = MagicMock(status_code=429)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_429, resp_200]
adapter = OciRegistryAdapter("reg.io")
result = adapter._request_with_retry("GET", "https://reg.io/v2/")
assert result.status_code == 200
mock_sleep.assert_called_once()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_connection_error_retries(self, mock_request, mock_sleep):
mock_request.side_effect = requests.exceptions.ConnectionError("failed")
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryNetworkError):
adapter._request_with_retry("GET", "https://reg.io/v2/")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_timeout_raises_immediately(self, mock_request):
mock_request.side_effect = requests.exceptions.Timeout("timeout")
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryNetworkError):
adapter._request_with_retry("GET", "https://reg.io/v2/")
assert mock_request.call_count == 1
class TestOciAdapterNextPageUrl:
def test_no_link_header(self):
resp = MagicMock(headers={})
assert OciRegistryAdapter._next_page_url(resp) is None
def test_link_header_with_next(self):
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'}
)
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_no_next(self):
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200>; rel="prev"'}
)
assert OciRegistryAdapter._next_page_url(resp) is None
class TestOciAdapterSSRF:
def test_reject_file_scheme(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
adapter._validate_realm_url("file:///etc/passwd")
def test_reject_ftp_scheme(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
adapter._validate_realm_url("ftp://evil.com/token")
def test_reject_private_ip(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://10.0.0.1/token")
def test_reject_loopback(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://127.0.0.1/token")
def test_reject_link_local(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://169.254.169.254/latest/meta-data")
def test_accept_public_https(self):
adapter = OciRegistryAdapter("reg.io")
# Should not raise
adapter._validate_realm_url("https://auth.example.com/token")
def test_accept_hostname_not_ip(self):
adapter = OciRegistryAdapter("reg.io")
# Hostnames (not IPs) should pass even if they resolve to private IPs
adapter._validate_realm_url("https://internal.corp.com/token")
class TestOciAdapterEmptyToken:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_empty_bearer_token_raises(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "", "access_token": ""}
mock_request.side_effect = [ping_resp, token_resp]
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._ensure_auth()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_none_bearer_token_raises(self, mock_request):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {}
mock_request.side_effect = [ping_resp, token_resp]
adapter = OciRegistryAdapter("reg.io", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._ensure_auth()
class TestOciAdapterNarrowExcept:
def test_invalid_utf8_base64_falls_through(self):
# Create a base64 string that decodes to invalid UTF-8
invalid_bytes = base64.b64encode(b"\xff\xfe").decode()
adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=invalid_bytes)
user, pwd = adapter._resolve_basic_credentials()
assert user == "AWS"
assert pwd == invalid_bytes
class TestCredentialRedaction:
def test_getstate_redacts_credentials(self):
adapter = OciRegistryAdapter(
"reg.io", username="u", password="secret", token="tok"
)
state = adapter.__getstate__()
assert state["_password"] == "***"
assert state["_token"] == "***"
assert state["username"] == "u"
assert state["registry_url"] == "reg.io"
def test_getstate_none_credentials(self):
adapter = OciRegistryAdapter("reg.io")
state = adapter.__getstate__()
assert state["_password"] is None
assert state["_token"] is None
def test_repr_redacts_credentials(self):
adapter = OciRegistryAdapter(
"reg.io", username="u", password="s3cret_pw", token="s3cret_tk"
)
r = repr(adapter)
assert "s3cret_pw" not in r
assert "s3cret_tk" not in r
assert "<redacted>" in r
def test_properties_still_work(self):
adapter = OciRegistryAdapter("reg.io", password="secret", token="tok")
assert adapter.password == "secret"
assert adapter.token == "tok"

View File

@@ -0,0 +1,234 @@
import os
from unittest.mock import MagicMock, patch
import pytest
from prowler.providers.image.exceptions.exceptions import (
ImageInvalidFilterError,
ImageMaxImagesExceededError,
)
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
_CLEAN_ENV = {
"PATH": os.environ.get("PATH", ""),
"HOME": os.environ.get("HOME", ""),
}
def _build_provider(**overrides):
defaults = dict(
images=[],
registry="myregistry.io",
image_filter=None,
tag_filter=None,
max_images=0,
registry_insecure=False,
registry_list_images=False,
config_content={"image": {}},
)
defaults.update(overrides)
with patch.dict(os.environ, _CLEAN_ENV, clear=True):
return ImageProvider(**defaults)
class TestRegistryEnumeration:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_enumerate_oci_registry(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
provider = _build_provider()
assert "myregistry.io/app/frontend:latest" in provider.images
assert "myregistry.io/app/frontend:v1.0" in provider.images
assert "myregistry.io/app/backend:latest" in provider.images
assert len(provider.images) == 3
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_image_filter(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["prod/app", "dev/app", "staging/app"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(image_filter="^prod/")
assert len(provider.images) == 1
assert "myregistry.io/prod/app:latest" in provider.images
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_tag_filter(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["myapp"]
adapter.list_tags.return_value = ["latest", "v1.0", "v2.0", "dev-abc123"]
mock_factory.return_value = adapter
provider = _build_provider(tag_filter=r"^v\d+\.\d+$")
assert len(provider.images) == 2
assert "myregistry.io/myapp:v1.0" in provider.images
assert "myregistry.io/myapp:v2.0" in provider.images
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_combined_filters(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["prod/app", "dev/app"]
adapter.list_tags.return_value = ["latest", "v1.0"]
mock_factory.return_value = adapter
provider = _build_provider(image_filter="^prod/", tag_filter="^v")
assert len(provider.images) == 1
assert "myregistry.io/prod/app:v1.0" in provider.images
class TestMaxImages:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_max_images_exceeded(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app1", "app2", "app3"]
adapter.list_tags.return_value = ["latest", "v1.0"]
mock_factory.return_value = adapter
with pytest.raises(ImageMaxImagesExceededError):
_build_provider(max_images=2)
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_max_images_not_exceeded(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app1"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(max_images=10)
assert len(provider.images) == 1
class TestDeduplication:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_deduplication_with_explicit_images(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["myapp"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(images=["myregistry.io/myapp:latest"])
assert provider.images.count("myregistry.io/myapp:latest") == 1
class TestInvalidFilters:
def test_invalid_image_filter_regex(self):
with pytest.raises(ImageInvalidFilterError):
_build_provider(image_filter="[invalid")
def test_invalid_tag_filter_regex(self):
with pytest.raises(ImageInvalidFilterError):
_build_provider(tag_filter="(unclosed")
class TestRegistryInsecure:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_insecure_passes_verify_false(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
_build_provider(registry_insecure=True)
mock_factory.assert_called_once()
call_kwargs = mock_factory.call_args[1]
assert call_kwargs["verify_ssl"] is False
class TestEmptyRegistry:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_empty_catalog_with_explicit_images(self, mock_factory):
adapter = MagicMock()
adapter.list_repositories.return_value = []
mock_factory.return_value = adapter
provider = _build_provider(images=["nginx:latest"])
assert provider.images == ["nginx:latest"]
class TestRegistryList:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_prints_and_returns(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
provider = _build_provider(registry_list_images=True)
assert provider._listing_only is True
captured = capsys.readouterr()
assert "app/frontend" in captured.out
assert "app/backend" in captured.out
assert "latest" in captured.out
assert "v1.0" in captured.out
assert "2 repositories" in captured.out
assert "3 images" in captured.out
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_respects_image_filter(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["prod/app", "dev/app"]
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
provider = _build_provider(registry_list_images=True, image_filter="^prod/")
assert provider._listing_only is True
captured = capsys.readouterr()
assert "prod/app" in captured.out
assert "dev/app" not in captured.out
assert "1 repository" in captured.out
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_respects_tag_filter(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["myapp"]
adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
mock_factory.return_value = adapter
provider = _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
assert provider._listing_only is True
captured = capsys.readouterr()
assert "v1.0" in captured.out
assert "dev-abc" not in captured.out
assert "1 image)" in captured.out
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_skips_max_images(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app1", "app2", "app3"]
adapter.list_tags.return_value = ["latest", "v1.0"]
mock_factory.return_value = adapter
# max_images=1 would normally raise, but --registry-list skips it
provider = _build_provider(registry_list_images=True, max_images=1)
assert provider._listing_only is True
captured = capsys.readouterr()
assert "6 images" in captured.out
class TestDockerHubEnumeration:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_dockerhub_images_use_repo_tag_format(self, mock_factory):
"""Docker Hub images should use repo:tag format without host prefix."""
adapter = MagicMock(spec=DockerHubAdapter)
adapter.list_repositories.return_value = ["myorg/app1", "myorg/app2"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
provider = _build_provider(registry="docker.io/myorg")
# Docker Hub images should NOT have host prefix
assert "myorg/app1:latest" in provider.images
assert "myorg/app1:v1.0" in provider.images
assert "myorg/app2:latest" in provider.images
# Ensure no host prefix was added
for img in provider.images:
assert not img.startswith("docker.io/"), f"Unexpected host prefix in {img}"
assert len(provider.images) == 3