mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-01 13:47:21 +00:00
Compare commits
2 Commits
chore/bump
...
feat/PROWL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ec21797cd | ||
|
|
97ed67853b |
@@ -5,7 +5,7 @@ LABEL maintainer="https://github.com/prowler-cloud/api"
|
||||
ARG POWERSHELL_VERSION=7.5.0
|
||||
ENV POWERSHELL_VERSION=${POWERSHELL_VERSION}
|
||||
|
||||
ARG TRIVY_VERSION=0.66.0
|
||||
ARG TRIVY_VERSION=0.69.1
|
||||
ENV TRIVY_VERSION=${TRIVY_VERSION}
|
||||
|
||||
# hadolint ignore=DL3008
|
||||
|
||||
40
api/src/backend/api/migrations/0081_image_provider.py
Normal file
40
api/src/backend/api/migrations/0081_image_provider.py
Normal file
@@ -0,0 +1,40 @@
|
||||
# Generated by Django migration for Image provider support
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
import api.db_utils
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
("api", "0080_backfill_attack_paths_graph_data_ready"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="provider",
|
||||
name="provider",
|
||||
field=api.db_utils.ProviderEnumField(
|
||||
choices=[
|
||||
("aws", "AWS"),
|
||||
("azure", "Azure"),
|
||||
("gcp", "GCP"),
|
||||
("kubernetes", "Kubernetes"),
|
||||
("m365", "M365"),
|
||||
("github", "GitHub"),
|
||||
("mongodbatlas", "MongoDB Atlas"),
|
||||
("iac", "IaC"),
|
||||
("oraclecloud", "Oracle Cloud Infrastructure"),
|
||||
("alibabacloud", "Alibaba Cloud"),
|
||||
("cloudflare", "Cloudflare"),
|
||||
("openstack", "OpenStack"),
|
||||
("image", "Image"),
|
||||
],
|
||||
default="aws",
|
||||
),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"ALTER TYPE provider ADD VALUE IF NOT EXISTS 'image';",
|
||||
reverse_sql=migrations.RunSQL.noop,
|
||||
),
|
||||
]
|
||||
@@ -289,6 +289,7 @@ class Provider(RowLevelSecurityProtectedModel):
|
||||
ALIBABACLOUD = "alibabacloud", _("Alibaba Cloud")
|
||||
CLOUDFLARE = "cloudflare", _("Cloudflare")
|
||||
OPENSTACK = "openstack", _("OpenStack")
|
||||
IMAGE = "image", _("Image")
|
||||
|
||||
@staticmethod
|
||||
def validate_aws_uid(value):
|
||||
@@ -423,6 +424,27 @@ class Provider(RowLevelSecurityProtectedModel):
|
||||
pointer="/data/attributes/uid",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def validate_image_uid(value):
|
||||
pattern = r"^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?(:\d{1,5})?(/[a-zA-Z0-9._-]+)*/?$"
|
||||
if not re.match(pattern, value):
|
||||
raise ModelValidationError(
|
||||
detail="Image provider ID must be a valid registry URL (e.g., ghcr.io, docker.io, "
|
||||
"123456789012.dkr.ecr.us-east-1.amazonaws.com).",
|
||||
code="image-uid",
|
||||
pointer="/data/attributes/uid",
|
||||
)
|
||||
# Validate port range if present
|
||||
port_match = re.search(r":(\d{1,5})(?=/|$)", value)
|
||||
if port_match:
|
||||
port = int(port_match.group(1))
|
||||
if port < 1 or port > 65535:
|
||||
raise ModelValidationError(
|
||||
detail="Image provider registry port must be between 1 and 65535.",
|
||||
code="image-uid",
|
||||
pointer="/data/attributes/uid",
|
||||
)
|
||||
|
||||
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
|
||||
inserted_at = models.DateTimeField(auto_now_add=True, editable=False)
|
||||
updated_at = models.DateTimeField(auto_now=True, editable=False)
|
||||
@@ -446,6 +468,8 @@ class Provider(RowLevelSecurityProtectedModel):
|
||||
|
||||
def clean(self):
|
||||
super().clean()
|
||||
if self.provider == self.ProviderChoices.IMAGE.value and self.uid:
|
||||
self.uid = re.sub(r"^https?://", "", self.uid)
|
||||
getattr(self, f"validate_{self.provider}_uid")(self.uid)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
|
||||
@@ -21,9 +21,11 @@ from prowler.providers.aws.aws_provider import AwsProvider
|
||||
from prowler.providers.aws.lib.security_hub.security_hub import SecurityHubConnection
|
||||
from prowler.providers.azure.azure_provider import AzureProvider
|
||||
from prowler.providers.cloudflare.cloudflare_provider import CloudflareProvider
|
||||
from prowler.providers.common.models import Connection
|
||||
from prowler.providers.gcp.gcp_provider import GcpProvider
|
||||
from prowler.providers.github.github_provider import GithubProvider
|
||||
from prowler.providers.iac.iac_provider import IacProvider
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
|
||||
from prowler.providers.m365.m365_provider import M365Provider
|
||||
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
|
||||
@@ -122,6 +124,7 @@ class TestReturnProwlerProvider:
|
||||
(Provider.ProviderChoices.ALIBABACLOUD.value, AlibabacloudProvider),
|
||||
(Provider.ProviderChoices.CLOUDFLARE.value, CloudflareProvider),
|
||||
(Provider.ProviderChoices.OPENSTACK.value, OpenstackProvider),
|
||||
(Provider.ProviderChoices.IMAGE.value, ImageProvider),
|
||||
],
|
||||
)
|
||||
def test_return_prowler_provider(self, provider_type, expected_provider):
|
||||
@@ -188,6 +191,51 @@ class TestProwlerProviderConnectionTest:
|
||||
assert isinstance(connection.error, Provider.secret.RelatedObjectDoesNotExist)
|
||||
assert str(connection.error) == "Provider has no secret."
|
||||
|
||||
@patch("api.utils.return_prowler_provider")
|
||||
def test_prowler_provider_connection_test_image_success(
|
||||
self, mock_return_prowler_provider
|
||||
):
|
||||
provider = MagicMock()
|
||||
provider.provider = Provider.ProviderChoices.IMAGE.value
|
||||
provider.uid = "ghcr.io"
|
||||
provider.secret.secret = {"registry_token": "tok"}
|
||||
|
||||
prowler_provider = MagicMock()
|
||||
prowler_provider.test_connection.return_value = Connection(is_connected=True)
|
||||
mock_return_prowler_provider.return_value = prowler_provider
|
||||
|
||||
connection = prowler_provider_connection_test(provider)
|
||||
|
||||
assert connection.is_connected is True
|
||||
assert connection.error is None
|
||||
prowler_provider.test_connection.assert_called_once_with(
|
||||
registry="ghcr.io",
|
||||
registry_username=None,
|
||||
registry_password=None,
|
||||
registry_token="tok",
|
||||
raise_on_exception=False,
|
||||
)
|
||||
|
||||
@patch("api.utils.return_prowler_provider")
|
||||
def test_prowler_provider_connection_test_image_failure(
|
||||
self, mock_return_prowler_provider
|
||||
):
|
||||
provider = MagicMock()
|
||||
provider.provider = Provider.ProviderChoices.IMAGE.value
|
||||
provider.uid = "ghcr.io"
|
||||
provider.secret.secret = {"registry_token": "bad-token"}
|
||||
|
||||
prowler_provider = MagicMock()
|
||||
prowler_provider.test_connection.return_value = Connection(
|
||||
is_connected=False, error="401 Unauthorized"
|
||||
)
|
||||
mock_return_prowler_provider.return_value = prowler_provider
|
||||
|
||||
connection = prowler_provider_connection_test(provider)
|
||||
|
||||
assert connection.is_connected is False
|
||||
assert connection.error == "401 Unauthorized"
|
||||
|
||||
|
||||
class TestGetProwlerProviderKwargs:
|
||||
@pytest.mark.parametrize(
|
||||
@@ -336,6 +384,103 @@ class TestGetProwlerProviderKwargs:
|
||||
}
|
||||
assert result == expected_result
|
||||
|
||||
def test_get_prowler_provider_kwargs_image_provider(self):
|
||||
"""Test that Image provider gets correct kwargs with registry URL and auth."""
|
||||
provider_uid = "ghcr.io"
|
||||
secret_dict = {
|
||||
"registry_username": "user",
|
||||
"registry_password": "pass",
|
||||
}
|
||||
secret_mock = MagicMock()
|
||||
secret_mock.secret = secret_dict
|
||||
|
||||
provider = MagicMock()
|
||||
provider.provider = Provider.ProviderChoices.IMAGE.value
|
||||
provider.secret = secret_mock
|
||||
provider.uid = provider_uid
|
||||
|
||||
result = get_prowler_provider_kwargs(provider)
|
||||
|
||||
expected_result = {
|
||||
"registry": provider_uid,
|
||||
"registry_username": "user",
|
||||
"registry_password": "pass",
|
||||
}
|
||||
assert result == expected_result
|
||||
|
||||
def test_get_prowler_provider_kwargs_image_provider_with_filters(self):
|
||||
"""Test that Image provider includes scan filters."""
|
||||
provider_uid = "docker.io"
|
||||
secret_dict = {
|
||||
"registry_token": "ghp_abc123",
|
||||
"image_filter": "my-app.*",
|
||||
"tag_filter": "v[0-9]+",
|
||||
"max_images": 50,
|
||||
}
|
||||
secret_mock = MagicMock()
|
||||
secret_mock.secret = secret_dict
|
||||
|
||||
provider = MagicMock()
|
||||
provider.provider = Provider.ProviderChoices.IMAGE.value
|
||||
provider.secret = secret_mock
|
||||
provider.uid = provider_uid
|
||||
|
||||
result = get_prowler_provider_kwargs(provider)
|
||||
|
||||
expected_result = {
|
||||
"registry": provider_uid,
|
||||
"registry_token": "ghp_abc123",
|
||||
"image_filter": "my-app.*",
|
||||
"tag_filter": "v[0-9]+",
|
||||
"max_images": 50,
|
||||
}
|
||||
assert result == expected_result
|
||||
|
||||
def test_get_prowler_provider_kwargs_image_provider_no_auth(self):
|
||||
"""Test that Image provider works with empty secret for public registries."""
|
||||
provider_uid = "docker.io"
|
||||
secret_dict = {}
|
||||
secret_mock = MagicMock()
|
||||
secret_mock.secret = secret_dict
|
||||
|
||||
provider = MagicMock()
|
||||
provider.provider = Provider.ProviderChoices.IMAGE.value
|
||||
provider.secret = secret_mock
|
||||
provider.uid = provider_uid
|
||||
|
||||
result = get_prowler_provider_kwargs(provider)
|
||||
|
||||
expected_result = {"registry": provider_uid}
|
||||
assert result == expected_result
|
||||
|
||||
def test_get_prowler_provider_kwargs_image_provider_ignores_mutelist(self):
|
||||
"""Test that Image provider does NOT receive mutelist_content.
|
||||
|
||||
Image provider uses Trivy's built-in logic, so it should not
|
||||
receive mutelist_content even when a mutelist processor is configured.
|
||||
"""
|
||||
provider_uid = "ghcr.io"
|
||||
secret_dict = {"registry_token": "test_token"}
|
||||
secret_mock = MagicMock()
|
||||
secret_mock.secret = secret_dict
|
||||
|
||||
mutelist_processor = MagicMock()
|
||||
mutelist_processor.configuration = {"Mutelist": {"key": "value"}}
|
||||
|
||||
provider = MagicMock()
|
||||
provider.provider = Provider.ProviderChoices.IMAGE.value
|
||||
provider.secret = secret_mock
|
||||
provider.uid = provider_uid
|
||||
|
||||
result = get_prowler_provider_kwargs(provider, mutelist_processor)
|
||||
|
||||
assert "mutelist_content" not in result
|
||||
expected_result = {
|
||||
"registry": provider_uid,
|
||||
"registry_token": "test_token",
|
||||
}
|
||||
assert result == expected_result
|
||||
|
||||
def test_get_prowler_provider_kwargs_unsupported_provider(self):
|
||||
# Setup
|
||||
provider_uid = "provider_uid"
|
||||
|
||||
@@ -1189,6 +1189,26 @@ class TestProviderViewSet:
|
||||
"uid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
|
||||
"alias": "OpenStack Project",
|
||||
},
|
||||
{
|
||||
"provider": "image",
|
||||
"uid": "ghcr.io",
|
||||
"alias": "GitHub Registry",
|
||||
},
|
||||
{
|
||||
"provider": "image",
|
||||
"uid": "docker.io",
|
||||
"alias": "Docker Hub",
|
||||
},
|
||||
{
|
||||
"provider": "image",
|
||||
"uid": "123456789012.dkr.ecr.us-east-1.amazonaws.com",
|
||||
"alias": "ECR",
|
||||
},
|
||||
{
|
||||
"provider": "image",
|
||||
"uid": "registry.example.com:5000",
|
||||
"alias": "Private",
|
||||
},
|
||||
]
|
||||
),
|
||||
)
|
||||
@@ -1202,6 +1222,24 @@ class TestProviderViewSet:
|
||||
assert Provider.objects.get().uid == provider_json_payload["uid"]
|
||||
assert Provider.objects.get().alias == provider_json_payload["alias"]
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"uid_input, expected_uid",
|
||||
[
|
||||
("https://registry.example.com:5000", "registry.example.com:5000"),
|
||||
("http://registry.local:8080", "registry.local:8080"),
|
||||
],
|
||||
)
|
||||
def test_providers_create_image_strips_http_prefix(
|
||||
self, authenticated_client, uid_input, expected_uid
|
||||
):
|
||||
response = authenticated_client.post(
|
||||
reverse("provider-list"),
|
||||
data={"provider": "image", "uid": uid_input, "alias": "test"},
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
assert Provider.objects.get().uid == expected_uid
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"provider_json_payload",
|
||||
(
|
||||
@@ -1633,6 +1671,26 @@ class TestProviderViewSet:
|
||||
"min_length",
|
||||
"uid",
|
||||
),
|
||||
# Image UID validation - too short (below min_length)
|
||||
(
|
||||
{
|
||||
"provider": "image",
|
||||
"uid": "ab",
|
||||
"alias": "test",
|
||||
},
|
||||
"min_length",
|
||||
"uid",
|
||||
),
|
||||
# Image UID validation - invalid characters (space)
|
||||
(
|
||||
{
|
||||
"provider": "image",
|
||||
"uid": "not valid!",
|
||||
"alias": "test",
|
||||
},
|
||||
"image-uid",
|
||||
"uid",
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
@@ -1788,6 +1846,38 @@ class TestProviderViewSet:
|
||||
assert "Content-Location" in response.headers
|
||||
assert response.headers["Content-Location"] == f"/api/v1/tasks/{task_mock.id}"
|
||||
|
||||
@patch("api.v1.views.Task.objects.get")
|
||||
@patch("api.v1.views.check_provider_connection_task.delay")
|
||||
def test_providers_connection_image(
|
||||
self,
|
||||
mock_provider_connection,
|
||||
mock_task_get,
|
||||
authenticated_client,
|
||||
providers_fixture,
|
||||
tasks_fixture,
|
||||
):
|
||||
prowler_task = tasks_fixture[0]
|
||||
task_mock = Mock()
|
||||
task_mock.id = prowler_task.id
|
||||
task_mock.status = "PENDING"
|
||||
mock_provider_connection.return_value = task_mock
|
||||
mock_task_get.return_value = prowler_task
|
||||
|
||||
image_provider = providers_fixture[11]
|
||||
assert image_provider.provider == "image"
|
||||
assert image_provider.connected is None
|
||||
assert image_provider.connection_last_checked_at is None
|
||||
|
||||
response = authenticated_client.post(
|
||||
reverse("provider-connection", kwargs={"pk": image_provider.id})
|
||||
)
|
||||
assert response.status_code == status.HTTP_202_ACCEPTED
|
||||
mock_provider_connection.assert_called_once_with(
|
||||
provider_id=str(image_provider.id), tenant_id=ANY
|
||||
)
|
||||
assert "Content-Location" in response.headers
|
||||
assert response.headers["Content-Location"] == f"/api/v1/tasks/{task_mock.id}"
|
||||
|
||||
def test_providers_connection_invalid_provider(
|
||||
self, authenticated_client, providers_fixture
|
||||
):
|
||||
@@ -1810,17 +1900,17 @@ class TestProviderViewSet:
|
||||
),
|
||||
("alias", "aws_testing_1", 1),
|
||||
("alias.icontains", "aws", 2),
|
||||
("inserted_at", TODAY, 11),
|
||||
("inserted_at", TODAY, 12),
|
||||
(
|
||||
"inserted_at.gte",
|
||||
"2024-01-01",
|
||||
11,
|
||||
12,
|
||||
),
|
||||
("inserted_at.lte", "2024-01-01", 0),
|
||||
(
|
||||
"updated_at.gte",
|
||||
"2024-01-01",
|
||||
11,
|
||||
12,
|
||||
),
|
||||
("updated_at.lte", "2024-01-01", 0),
|
||||
]
|
||||
@@ -2436,6 +2526,39 @@ class TestProviderSecretViewSet:
|
||||
"clouds_yaml_cloud": "mycloud",
|
||||
},
|
||||
),
|
||||
# Image with Docker login
|
||||
(
|
||||
Provider.ProviderChoices.IMAGE.value,
|
||||
ProviderSecret.TypeChoices.STATIC,
|
||||
{
|
||||
"registry_username": "user",
|
||||
"registry_password": "pass",
|
||||
},
|
||||
),
|
||||
# Image with token
|
||||
(
|
||||
Provider.ProviderChoices.IMAGE.value,
|
||||
ProviderSecret.TypeChoices.STATIC,
|
||||
{
|
||||
"registry_token": "ghp_abc123",
|
||||
},
|
||||
),
|
||||
# Image with no auth + filters
|
||||
(
|
||||
Provider.ProviderChoices.IMAGE.value,
|
||||
ProviderSecret.TypeChoices.STATIC,
|
||||
{
|
||||
"image_filter": "my-app.*",
|
||||
"tag_filter": "v[0-9]+",
|
||||
"max_images": 50,
|
||||
},
|
||||
),
|
||||
# Image with empty secret (public registry)
|
||||
(
|
||||
Provider.ProviderChoices.IMAGE.value,
|
||||
ProviderSecret.TypeChoices.STATIC,
|
||||
{},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_provider_secrets_create_valid(
|
||||
|
||||
@@ -28,6 +28,7 @@ if TYPE_CHECKING:
|
||||
from prowler.providers.gcp.gcp_provider import GcpProvider
|
||||
from prowler.providers.github.github_provider import GithubProvider
|
||||
from prowler.providers.iac.iac_provider import IacProvider
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
|
||||
from prowler.providers.m365.m365_provider import M365Provider
|
||||
from prowler.providers.mongodbatlas.mongodbatlas_provider import (
|
||||
@@ -83,6 +84,7 @@ def return_prowler_provider(
|
||||
| GcpProvider
|
||||
| GithubProvider
|
||||
| IacProvider
|
||||
| ImageProvider
|
||||
| KubernetesProvider
|
||||
| M365Provider
|
||||
| MongodbatlasProvider
|
||||
@@ -95,7 +97,7 @@ def return_prowler_provider(
|
||||
provider (Provider): The provider object containing the provider type and associated secrets.
|
||||
|
||||
Returns:
|
||||
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
|
||||
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
|
||||
|
||||
Raises:
|
||||
ValueError: If the provider type specified in `provider.provider` is not supported.
|
||||
@@ -159,6 +161,10 @@ def return_prowler_provider(
|
||||
from prowler.providers.openstack.openstack_provider import OpenstackProvider
|
||||
|
||||
prowler_provider = OpenstackProvider
|
||||
case Provider.ProviderChoices.IMAGE.value:
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
|
||||
prowler_provider = ImageProvider
|
||||
case _:
|
||||
raise ValueError(f"Provider type {provider.provider} not supported")
|
||||
return prowler_provider
|
||||
@@ -221,11 +227,29 @@ def get_prowler_provider_kwargs(
|
||||
# clouds.yaml is not feasible because not all auth methods include it and the
|
||||
# Keystone API is unavailable on public clouds.
|
||||
pass
|
||||
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
|
||||
prowler_provider_kwargs = {"registry": provider.uid}
|
||||
for key in (
|
||||
"registry_username",
|
||||
"registry_password",
|
||||
"registry_token",
|
||||
"image_filter",
|
||||
"tag_filter",
|
||||
):
|
||||
if key in provider.secret.secret:
|
||||
prowler_provider_kwargs[key] = provider.secret.secret[key]
|
||||
if "max_images" in provider.secret.secret:
|
||||
prowler_provider_kwargs["max_images"] = int(
|
||||
provider.secret.secret["max_images"]
|
||||
)
|
||||
|
||||
if mutelist_processor:
|
||||
mutelist_content = mutelist_processor.configuration.get("Mutelist", {})
|
||||
# IaC provider doesn't support mutelist (uses Trivy's built-in logic)
|
||||
if mutelist_content and provider.provider != Provider.ProviderChoices.IAC.value:
|
||||
# IaC/Image providers don't support mutelist (use Trivy's built-in logic)
|
||||
if mutelist_content and provider.provider not in (
|
||||
Provider.ProviderChoices.IAC.value,
|
||||
Provider.ProviderChoices.IMAGE.value,
|
||||
):
|
||||
prowler_provider_kwargs["mutelist_content"] = mutelist_content
|
||||
|
||||
return prowler_provider_kwargs
|
||||
@@ -242,6 +266,7 @@ def initialize_prowler_provider(
|
||||
| GcpProvider
|
||||
| GithubProvider
|
||||
| IacProvider
|
||||
| ImageProvider
|
||||
| KubernetesProvider
|
||||
| M365Provider
|
||||
| MongodbatlasProvider
|
||||
@@ -255,7 +280,7 @@ def initialize_prowler_provider(
|
||||
mutelist_processor (Processor): The mutelist processor object containing the mutelist configuration.
|
||||
|
||||
Returns:
|
||||
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
|
||||
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
|
||||
initialized with the provider's secrets.
|
||||
"""
|
||||
prowler_provider = return_prowler_provider(provider)
|
||||
@@ -297,6 +322,14 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
|
||||
"raise_on_exception": False,
|
||||
}
|
||||
return prowler_provider.test_connection(**openstack_kwargs)
|
||||
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
|
||||
return prowler_provider.test_connection(
|
||||
registry=provider.uid,
|
||||
registry_username=prowler_provider_kwargs.get("registry_username"),
|
||||
registry_password=prowler_provider_kwargs.get("registry_password"),
|
||||
registry_token=prowler_provider_kwargs.get("registry_token"),
|
||||
raise_on_exception=False,
|
||||
)
|
||||
else:
|
||||
return prowler_provider.test_connection(
|
||||
**prowler_provider_kwargs,
|
||||
|
||||
@@ -388,6 +388,37 @@ from rest_framework_json_api import serializers
|
||||
},
|
||||
"required": ["clouds_yaml_content", "clouds_yaml_cloud"],
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"title": "Image Registry Credentials",
|
||||
"properties": {
|
||||
"registry_username": {
|
||||
"type": "string",
|
||||
"description": "Username for Docker login authentication.",
|
||||
},
|
||||
"registry_password": {
|
||||
"type": "string",
|
||||
"description": "Password for Docker login authentication.",
|
||||
},
|
||||
"registry_token": {
|
||||
"type": "string",
|
||||
"description": "Bearer token for registry authentication.",
|
||||
},
|
||||
"image_filter": {
|
||||
"type": "string",
|
||||
"description": "Regex pattern to filter repository names during registry enumeration.",
|
||||
},
|
||||
"tag_filter": {
|
||||
"type": "string",
|
||||
"description": "Regex pattern to filter image tags during registry enumeration.",
|
||||
},
|
||||
"max_images": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
"description": "Maximum number of images to scan (0 = unlimited).",
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1528,6 +1528,8 @@ class BaseWriteProviderSecretSerializer(BaseWriteSerializer):
|
||||
)
|
||||
elif provider_type == Provider.ProviderChoices.OPENSTACK.value:
|
||||
serializer = OpenStackCloudsYamlProviderSecret(data=secret)
|
||||
elif provider_type == Provider.ProviderChoices.IMAGE.value:
|
||||
serializer = ImageProviderSecret(data=secret)
|
||||
else:
|
||||
raise serializers.ValidationError(
|
||||
{"provider": f"Provider type not supported {provider_type}"}
|
||||
@@ -1666,6 +1668,41 @@ class IacProviderSecret(serializers.Serializer):
|
||||
resource_name = "provider-secrets"
|
||||
|
||||
|
||||
class ImageProviderSecret(serializers.Serializer):
|
||||
registry_username = serializers.CharField(required=False)
|
||||
registry_password = serializers.CharField(required=False)
|
||||
registry_token = serializers.CharField(required=False)
|
||||
image_filter = serializers.CharField(required=False)
|
||||
tag_filter = serializers.CharField(required=False)
|
||||
max_images = serializers.IntegerField(required=False, min_value=0)
|
||||
|
||||
def validate(self, attrs):
|
||||
username = attrs.get("registry_username")
|
||||
password = attrs.get("registry_password")
|
||||
token = attrs.get("registry_token")
|
||||
|
||||
if token and (username or password):
|
||||
raise serializers.ValidationError(
|
||||
{"secret": "Provide either username/password or token, not both."}
|
||||
)
|
||||
if username and not password:
|
||||
raise serializers.ValidationError(
|
||||
{
|
||||
"secret/registry_password": "Password is required when username is provided."
|
||||
}
|
||||
)
|
||||
if password and not username:
|
||||
raise serializers.ValidationError(
|
||||
{
|
||||
"secret/registry_username": "Username is required when password is provided."
|
||||
}
|
||||
)
|
||||
return attrs
|
||||
|
||||
class Meta:
|
||||
resource_name = "provider-secrets"
|
||||
|
||||
|
||||
class OracleCloudProviderSecret(serializers.Serializer):
|
||||
user = serializers.CharField()
|
||||
fingerprint = serializers.CharField()
|
||||
|
||||
@@ -543,6 +543,12 @@ def providers_fixture(tenants_fixture):
|
||||
alias="openstack_testing",
|
||||
tenant_id=tenant.id,
|
||||
)
|
||||
provider12 = Provider.objects.create(
|
||||
provider="image",
|
||||
uid="ghcr.io",
|
||||
alias="image_testing",
|
||||
tenant_id=tenant.id,
|
||||
)
|
||||
|
||||
return (
|
||||
provider1,
|
||||
@@ -556,6 +562,7 @@ def providers_fixture(tenants_fixture):
|
||||
provider9,
|
||||
provider10,
|
||||
provider11,
|
||||
provider12,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -137,6 +137,9 @@ COMPLIANCE_CLASS_MAP = {
|
||||
# IaC provider doesn't have specific compliance frameworks yet
|
||||
# Trivy handles its own compliance checks
|
||||
],
|
||||
"image": [
|
||||
# Image provider doesn't have compliance frameworks yet
|
||||
],
|
||||
"oraclecloud": [
|
||||
(lambda name: name.startswith("cis_"), OracleCloudCIS),
|
||||
(lambda name: name.startswith("csa_"), OracleCloudCSA),
|
||||
|
||||
@@ -11,8 +11,8 @@ from django_celery_beat.models import PeriodicTask
|
||||
from tasks.jobs.attack_paths import (
|
||||
attack_paths_scan,
|
||||
can_provider_run_attack_paths_scan,
|
||||
db_utils as attack_paths_db_utils,
|
||||
)
|
||||
from tasks.jobs.attack_paths import db_utils as attack_paths_db_utils
|
||||
from tasks.jobs.backfill import (
|
||||
backfill_compliance_summaries,
|
||||
backfill_daily_severity_summaries,
|
||||
@@ -134,13 +134,41 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
|
||||
scan_id (str): The ID of the scan that was performed.
|
||||
provider_id (str): The primary key of the Provider instance that was scanned.
|
||||
"""
|
||||
chain(
|
||||
create_compliance_requirements_task.si(tenant_id=tenant_id, scan_id=scan_id),
|
||||
update_provider_compliance_scores_task.si(tenant_id=tenant_id, scan_id=scan_id),
|
||||
).apply_async()
|
||||
aggregate_attack_surface_task.apply_async(
|
||||
kwargs={"tenant_id": tenant_id, "scan_id": scan_id}
|
||||
with rls_transaction(tenant_id):
|
||||
provider_type = Provider.objects.get(id=provider_id).provider
|
||||
|
||||
has_compliance = provider_type not in (
|
||||
Provider.ProviderChoices.IAC.value,
|
||||
Provider.ProviderChoices.IMAGE.value,
|
||||
)
|
||||
|
||||
if has_compliance:
|
||||
chain(
|
||||
create_compliance_requirements_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
),
|
||||
update_provider_compliance_scores_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
),
|
||||
).apply_async()
|
||||
aggregate_attack_surface_task.apply_async(
|
||||
kwargs={"tenant_id": tenant_id, "scan_id": scan_id}
|
||||
)
|
||||
|
||||
final_group_tasks = [
|
||||
check_integrations_task.si(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=provider_id,
|
||||
scan_id=scan_id,
|
||||
),
|
||||
]
|
||||
if has_compliance:
|
||||
final_group_tasks.append(
|
||||
generate_compliance_reports_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id, provider_id=provider_id
|
||||
),
|
||||
)
|
||||
|
||||
chain(
|
||||
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
|
||||
group(
|
||||
@@ -149,17 +177,7 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
|
||||
scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id
|
||||
),
|
||||
),
|
||||
group(
|
||||
# Use optimized task that generates both reports with shared queries
|
||||
generate_compliance_reports_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id, provider_id=provider_id
|
||||
),
|
||||
check_integrations_task.si(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=provider_id,
|
||||
scan_id=scan_id,
|
||||
),
|
||||
),
|
||||
group(*final_group_tasks),
|
||||
).apply_async()
|
||||
|
||||
if can_provider_run_attack_paths_scan(tenant_id, provider_id):
|
||||
|
||||
@@ -22,8 +22,8 @@ def load_checks_to_execute(
|
||||
) -> set:
|
||||
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
|
||||
try:
|
||||
# Bypass check loading for IAC provider since it uses Trivy directly
|
||||
if provider == "iac":
|
||||
# Bypass check loading for IAC/Image providers since they use Trivy directly
|
||||
if provider in ("iac", "image"):
|
||||
return set()
|
||||
|
||||
# Local subsets
|
||||
|
||||
@@ -864,6 +864,7 @@ class CheckReportImage(Check_Report):
|
||||
resource_name: str
|
||||
resource_id: str
|
||||
image_digest: str
|
||||
image_sha: str
|
||||
package_name: str
|
||||
installed_version: str
|
||||
fixed_version: str
|
||||
@@ -895,6 +896,7 @@ class CheckReportImage(Check_Report):
|
||||
or finding.get("ID", "")
|
||||
)
|
||||
self.image_digest = finding.get("PkgID", "")
|
||||
self.image_sha = ""
|
||||
self.package_name = finding.get("PkgName", "")
|
||||
self.installed_version = finding.get("InstalledVersion", "")
|
||||
self.fixed_version = finding.get("FixedVersion", "")
|
||||
|
||||
@@ -384,10 +384,12 @@ class Finding(BaseModel):
|
||||
output_data["auth_method"] = provider.auth_method
|
||||
output_data["account_uid"] = "image"
|
||||
output_data["account_name"] = "image"
|
||||
output_data["resource_name"] = getattr(
|
||||
check_output, "resource_name", ""
|
||||
image_name = getattr(check_output, "resource_name", "")
|
||||
image_sha = getattr(check_output, "image_sha", "")
|
||||
output_data["resource_name"] = image_name
|
||||
output_data["resource_uid"] = (
|
||||
f"{image_name}:{image_sha}" if image_sha else image_name
|
||||
)
|
||||
output_data["resource_uid"] = getattr(check_output, "resource_id", "")
|
||||
output_data["region"] = getattr(check_output, "region", "container")
|
||||
output_data["package_name"] = getattr(check_output, "package_name", "")
|
||||
output_data["installed_version"] = getattr(
|
||||
|
||||
@@ -27,6 +27,7 @@ from prowler.lib.scan.exceptions.exceptions import (
|
||||
from prowler.providers.common.models import Audit_Metadata, ProviderOutputOptions
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.iac.iac_provider import IacProvider
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
|
||||
|
||||
class Scan:
|
||||
@@ -92,10 +93,10 @@ class Scan:
|
||||
except ValueError:
|
||||
raise ScanInvalidStatusError(f"Invalid status provided: {s}.")
|
||||
|
||||
# Special setup for IaC provider - override inputs to work with traditional flow
|
||||
if provider.type == "iac":
|
||||
# IaC doesn't use traditional Prowler checks, so clear all input parameters
|
||||
# to avoid validation errors and let it flow through the normal logic
|
||||
# Special setup for IaC/Image providers - override inputs to work with traditional flow
|
||||
if provider.type in ("iac", "image"):
|
||||
# These providers don't use traditional Prowler checks, so clear all input
|
||||
# parameters to avoid validation errors and let them flow through the normal logic
|
||||
checks = None
|
||||
services = None
|
||||
excluded_checks = None
|
||||
@@ -160,8 +161,8 @@ class Scan:
|
||||
)
|
||||
|
||||
# Load checks to execute
|
||||
if provider.type == "iac":
|
||||
self._checks_to_execute = ["iac_scan"] # Dummy check name for IaC
|
||||
if provider.type in ("iac", "image"):
|
||||
self._checks_to_execute = [f"{provider.type}_scan"] # Dummy check name
|
||||
else:
|
||||
self._checks_to_execute = sorted(
|
||||
load_checks_to_execute(
|
||||
@@ -200,8 +201,8 @@ class Scan:
|
||||
self._number_of_checks_to_execute = len(self._checks_to_execute)
|
||||
|
||||
# Set up service-based checks tracking
|
||||
if provider.type == "iac":
|
||||
service_checks_to_execute = {"iac": set(["iac_scan"])}
|
||||
if provider.type in ("iac", "image"):
|
||||
service_checks_to_execute = {provider.type: set([f"{provider.type}_scan"])}
|
||||
else:
|
||||
service_checks_to_execute = get_service_checks_to_execute(
|
||||
self._checks_to_execute
|
||||
@@ -346,6 +347,68 @@ class Scan:
|
||||
self._duration = int((end_time - start_time).total_seconds())
|
||||
return
|
||||
|
||||
# Special handling for Image provider
|
||||
elif self._provider.type == "image":
|
||||
if isinstance(self._provider, ImageProvider):
|
||||
logger.info("Running Image scan with Trivy...")
|
||||
|
||||
total_images = len(self._provider.images)
|
||||
self._number_of_checks_to_execute = max(total_images, 1)
|
||||
|
||||
for i, (image_name, image_reports) in enumerate(
|
||||
self._provider.scan_per_image()
|
||||
):
|
||||
# Build resource UID from image name + SHA (all reports share the same SHA)
|
||||
image_sha = image_reports[0].image_sha if image_reports else ""
|
||||
resource_uid = (
|
||||
f"{image_name}:{image_sha}" if image_sha else image_name
|
||||
)
|
||||
|
||||
findings = []
|
||||
for report in image_reports:
|
||||
finding_uid = (
|
||||
f"{report.check_metadata.CheckID}"
|
||||
f"-{image_name}"
|
||||
f"-{report.resource_id}"
|
||||
)
|
||||
status_enum = (
|
||||
Status.FAIL if report.status == "FAIL" else Status.PASS
|
||||
)
|
||||
if report.muted:
|
||||
status_enum = Status.MUTED
|
||||
|
||||
finding = Finding(
|
||||
auth_method="Registry",
|
||||
timestamp=datetime.datetime.now(timezone.utc),
|
||||
account_uid=getattr(self._provider, "registry", None)
|
||||
or "image",
|
||||
account_name="Container Registry",
|
||||
metadata=report.check_metadata,
|
||||
uid=finding_uid,
|
||||
status=status_enum,
|
||||
status_extended=report.status_extended,
|
||||
muted=report.muted,
|
||||
resource_uid=resource_uid,
|
||||
resource_metadata=report.resource,
|
||||
resource_name=image_name,
|
||||
resource_details=report.resource_details,
|
||||
resource_tags={},
|
||||
region=report.region,
|
||||
compliance={},
|
||||
raw=report.resource,
|
||||
)
|
||||
findings.append(finding)
|
||||
|
||||
if self._status:
|
||||
findings = [f for f in findings if f.status in self._status]
|
||||
|
||||
self._number_of_checks_completed = i + 1
|
||||
yield (self.progress, findings)
|
||||
|
||||
end_time = datetime.datetime.now()
|
||||
self._duration = int((end_time - start_time).total_seconds())
|
||||
return
|
||||
|
||||
for check_name in checks_to_execute:
|
||||
try:
|
||||
# Recover service from check name
|
||||
|
||||
@@ -96,6 +96,7 @@ class ImageProvider(Provider):
|
||||
self.audited_account = "image-scan"
|
||||
self._session = None
|
||||
self._identity = "prowler"
|
||||
self._listing_only = False
|
||||
|
||||
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
|
||||
self.registry_username = registry_username or os.environ.get(
|
||||
@@ -107,8 +108,8 @@ class ImageProvider(Provider):
|
||||
self.registry_token = registry_token or os.environ.get("REGISTRY_TOKEN")
|
||||
|
||||
if self.registry_username and self.registry_password:
|
||||
self._auth_method = "Basic auth"
|
||||
logger.info("Using basic auth for registry authentication")
|
||||
self._auth_method = "Docker login"
|
||||
logger.info("Using docker login for registry authentication")
|
||||
elif self.registry_token:
|
||||
self._auth_method = "Registry token"
|
||||
logger.info("Using registry token for authentication")
|
||||
@@ -152,6 +153,8 @@ class ImageProvider(Provider):
|
||||
# Registry scan mode: enumerate images from registry
|
||||
if self.registry:
|
||||
self._enumerate_registry()
|
||||
if self._listing_only:
|
||||
return
|
||||
|
||||
for image in self.images:
|
||||
self._validate_image_name(image)
|
||||
@@ -323,36 +326,44 @@ class ImageProvider(Provider):
|
||||
"""Clean up any resources after scanning."""
|
||||
|
||||
def _process_finding(
|
||||
self, finding: dict, image_name: str, finding_type: str
|
||||
self,
|
||||
finding: dict,
|
||||
image: str,
|
||||
trivy_target: str,
|
||||
image_sha: str = "",
|
||||
) -> CheckReportImage:
|
||||
"""
|
||||
Process a single finding and create a CheckReportImage object.
|
||||
|
||||
Args:
|
||||
finding: The finding object from Trivy output
|
||||
image_name: The container image name being scanned
|
||||
finding_type: The type of finding (Vulnerability, Secret, etc.)
|
||||
image: The clean container image name (e.g., "alpine:3.18")
|
||||
trivy_target: The Trivy target string (e.g., "alpine:3.18 (alpine 3.18.0)")
|
||||
image_sha: Short SHA from Trivy Metadata.ImageID for resource uniqueness
|
||||
|
||||
Returns:
|
||||
CheckReportImage: The processed check report
|
||||
"""
|
||||
try:
|
||||
# Determine finding ID based on type
|
||||
# Determine finding ID and category based on type
|
||||
if "VulnerabilityID" in finding:
|
||||
finding_id = finding["VulnerabilityID"]
|
||||
finding_description = finding.get(
|
||||
"Description", finding.get("Title", "")
|
||||
)
|
||||
finding_status = "FAIL"
|
||||
finding_categories = ["vulnerability"]
|
||||
elif "RuleID" in finding:
|
||||
# Secret finding
|
||||
finding_id = finding["RuleID"]
|
||||
finding_description = finding.get("Title", "Secret detected")
|
||||
finding_status = "FAIL"
|
||||
finding_categories = ["secrets"]
|
||||
else:
|
||||
finding_id = finding.get("ID", "UNKNOWN")
|
||||
finding_description = finding.get("Description", "")
|
||||
finding_status = finding.get("Status", "FAIL")
|
||||
finding_categories = []
|
||||
|
||||
# Build remediation text for vulnerabilities
|
||||
remediation_text = ""
|
||||
@@ -371,7 +382,7 @@ class ImageProvider(Provider):
|
||||
"CheckID": finding_id,
|
||||
"CheckTitle": finding.get("Title", finding_id),
|
||||
"CheckType": ["Container Image Security"],
|
||||
"ServiceName": finding_type,
|
||||
"ServiceName": "container-image",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": trivy_severity,
|
||||
@@ -381,7 +392,7 @@ class ImageProvider(Provider):
|
||||
"Risk": finding.get(
|
||||
"Description", "Vulnerability detected in container image"
|
||||
),
|
||||
"RelatedUrl": finding.get("PrimaryURL", ""),
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"NativeIaC": "",
|
||||
@@ -394,7 +405,7 @@ class ImageProvider(Provider):
|
||||
"Url": finding.get("PrimaryURL", ""),
|
||||
},
|
||||
},
|
||||
"Categories": [],
|
||||
"Categories": finding_categories,
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "",
|
||||
@@ -404,11 +415,13 @@ class ImageProvider(Provider):
|
||||
metadata = json.dumps(metadata_dict)
|
||||
|
||||
report = CheckReportImage(
|
||||
metadata=metadata, finding=finding, image_name=image_name
|
||||
metadata=metadata, finding=finding, image_name=image
|
||||
)
|
||||
report.status = finding_status
|
||||
report.status_extended = self._build_status_extended(finding)
|
||||
report.region = self.region
|
||||
report.image_sha = image_sha
|
||||
report.resource_details = trivy_target
|
||||
return report
|
||||
|
||||
except Exception as error:
|
||||
@@ -453,6 +466,29 @@ class ImageProvider(Provider):
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
def scan_per_image(
|
||||
self,
|
||||
) -> Generator[tuple[str, list[CheckReportImage]], None, None]:
|
||||
"""Scan images one by one, yielding (image_name, findings) per image.
|
||||
|
||||
Unlike run() which returns all findings at once, this method yields
|
||||
after each image completes, enabling progress tracking.
|
||||
"""
|
||||
try:
|
||||
for image in self.images:
|
||||
try:
|
||||
image_findings = []
|
||||
for batch in self._scan_single_image(image):
|
||||
image_findings.extend(batch)
|
||||
yield (image, image_findings)
|
||||
except (ImageScanError, ImageTrivyBinaryNotFoundError):
|
||||
raise
|
||||
except Exception as error:
|
||||
logger.error(f"Error scanning image {image}: {error}")
|
||||
continue
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
def run_scan(self) -> Generator[list[CheckReportImage], None, None]:
|
||||
"""
|
||||
Run Trivy scan on all configured images.
|
||||
@@ -534,6 +570,19 @@ class ImageProvider(Provider):
|
||||
logger.info(f"No findings for image: {image}")
|
||||
return
|
||||
|
||||
# Extract image digest for resource uniqueness
|
||||
trivy_metadata = output.get("Metadata", {})
|
||||
image_id = trivy_metadata.get("ImageID", "")
|
||||
if not image_id:
|
||||
repo_digests = trivy_metadata.get("RepoDigests", [])
|
||||
if repo_digests:
|
||||
image_id = (
|
||||
repo_digests[0].split("@")[-1]
|
||||
if "@" in repo_digests[0]
|
||||
else ""
|
||||
)
|
||||
short_sha = image_id.replace("sha256:", "")[:12] if image_id else ""
|
||||
|
||||
except json.JSONDecodeError as error:
|
||||
logger.error(f"Failed to parse Trivy output for {image}: {error}")
|
||||
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
|
||||
@@ -544,11 +593,12 @@ class ImageProvider(Provider):
|
||||
|
||||
for result in results:
|
||||
target = result.get("Target", image)
|
||||
result_type = result.get("Type", "unknown")
|
||||
|
||||
# Process Vulnerabilities
|
||||
for vuln in result.get("Vulnerabilities", []):
|
||||
report = self._process_finding(vuln, target, result_type)
|
||||
report = self._process_finding(
|
||||
vuln, image, target, image_sha=short_sha
|
||||
)
|
||||
batch.append(report)
|
||||
if len(batch) >= self.FINDING_BATCH_SIZE:
|
||||
yield batch
|
||||
@@ -556,7 +606,9 @@ class ImageProvider(Provider):
|
||||
|
||||
# Process Secrets
|
||||
for secret in result.get("Secrets", []):
|
||||
report = self._process_finding(secret, target, "secret")
|
||||
report = self._process_finding(
|
||||
secret, image, target, image_sha=short_sha
|
||||
)
|
||||
batch.append(report)
|
||||
if len(batch) >= self.FINDING_BATCH_SIZE:
|
||||
yield batch
|
||||
@@ -565,7 +617,7 @@ class ImageProvider(Provider):
|
||||
# Process Misconfigurations (from Dockerfile)
|
||||
for misconfig in result.get("Misconfigurations", []):
|
||||
report = self._process_finding(
|
||||
misconfig, target, "misconfiguration"
|
||||
misconfig, image, target, image_sha=short_sha
|
||||
)
|
||||
batch.append(report)
|
||||
if len(batch) >= self.FINDING_BATCH_SIZE:
|
||||
@@ -679,7 +731,7 @@ class ImageProvider(Provider):
|
||||
lower = error_msg.lower()
|
||||
|
||||
if any(kw in lower for kw in ("401", "403", "unauthorized", "denied")):
|
||||
return f"Auth failure — check registry credentials: {error_msg}"
|
||||
return f"Auth failure — check `docker login`: {error_msg}"
|
||||
if any(kw in lower for kw in ("404", "manifest unknown", "not found")):
|
||||
return f"Image not found — check name/tag/registry: {error_msg}"
|
||||
if any(kw in lower for kw in ("429", "rate limit", "too many requests")):
|
||||
@@ -747,10 +799,11 @@ class ImageProvider(Provider):
|
||||
image_ref = f"{registry_host}/{repo}:{tag}"
|
||||
discovered_images.append(image_ref)
|
||||
|
||||
# Registry list mode: print listing and exit
|
||||
# Registry list mode: print listing and return early
|
||||
if self.registry_list_images:
|
||||
self._print_registry_listing(repos_tags, len(discovered_images))
|
||||
raise SystemExit(0)
|
||||
self._listing_only = True
|
||||
return
|
||||
|
||||
# Check max-images limit
|
||||
if self.max_images and len(discovered_images) > self.max_images:
|
||||
@@ -843,17 +896,23 @@ class ImageProvider(Provider):
|
||||
image: str | None = None,
|
||||
raise_on_exception: bool = True,
|
||||
provider_id: str | None = None,
|
||||
registry: str | None = None,
|
||||
registry_username: str | None = None,
|
||||
registry_password: str | None = None,
|
||||
registry_token: str | None = None,
|
||||
) -> "Connection":
|
||||
"""
|
||||
Test connection to container registry by attempting to inspect an image.
|
||||
Test connection to a container registry or individual image.
|
||||
|
||||
When ``registry`` is provided, validates registry-level connectivity
|
||||
by listing repositories through the registry adapter. Otherwise
|
||||
falls back to testing access to an individual ``image`` via Trivy.
|
||||
|
||||
Args:
|
||||
image: Container image to test
|
||||
image: Container image to test (individual image mode)
|
||||
raise_on_exception: Whether to raise exceptions
|
||||
provider_id: Fallback for image name
|
||||
registry: Registry URL for registry-level connectivity test
|
||||
registry_username: Registry username for basic auth
|
||||
registry_password: Registry password for basic auth
|
||||
registry_token: Registry token for token-based auth
|
||||
@@ -862,6 +921,22 @@ class ImageProvider(Provider):
|
||||
Connection: Connection object with success status
|
||||
"""
|
||||
try:
|
||||
# Registry-level connectivity test (used by API)
|
||||
if registry:
|
||||
from prowler.providers.image.lib.registry.factory import (
|
||||
create_registry_adapter,
|
||||
)
|
||||
|
||||
adapter = create_registry_adapter(
|
||||
registry_url=registry,
|
||||
username=registry_username,
|
||||
password=registry_password,
|
||||
token=registry_token,
|
||||
)
|
||||
adapter.list_repositories()
|
||||
return Connection(is_connected=True)
|
||||
|
||||
# Individual image test (existing behaviour)
|
||||
if provider_id and not image:
|
||||
image = provider_id
|
||||
|
||||
|
||||
@@ -416,3 +416,102 @@ class TestScan:
|
||||
results = list(scan.scan(custom_checks_metadata))
|
||||
|
||||
assert results[0] == (100.0, [])
|
||||
|
||||
|
||||
class TestImageScanPath:
|
||||
"""Tests for the IMAGE provider scan path in Scan.scan()."""
|
||||
|
||||
@staticmethod
|
||||
def _make_check_metadata(check_id):
|
||||
from prowler.lib.check.models import CheckMetadata
|
||||
|
||||
return CheckMetadata(
|
||||
Provider="image",
|
||||
CheckID=check_id,
|
||||
CheckTitle=f"Test {check_id}",
|
||||
CheckType=["Container Image Security"],
|
||||
ServiceName="vuln",
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity="high",
|
||||
ResourceType="container-image",
|
||||
ResourceGroup="container",
|
||||
Description="Test finding",
|
||||
Risk="Test risk",
|
||||
RelatedUrl="",
|
||||
Remediation={
|
||||
"Code": {"NativeIaC": "", "Terraform": "", "CLI": "", "Other": ""},
|
||||
"Recommendation": {"Text": "", "Url": ""},
|
||||
},
|
||||
Categories=["vulnerability"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _make_report(check_id, image_name, status="FAIL", image_sha="abc123def456"):
|
||||
report = MagicMock()
|
||||
report.check_metadata = TestImageScanPath._make_check_metadata(check_id)
|
||||
report.resource_name = image_name
|
||||
report.resource_id = f"{image_name}-resource"
|
||||
report.status = status
|
||||
report.status_extended = f"{check_id} found"
|
||||
report.muted = False
|
||||
report.resource = {}
|
||||
report.resource_details = ""
|
||||
report.region = "container"
|
||||
report.image_sha = image_sha
|
||||
return report
|
||||
|
||||
@pytest.fixture
|
||||
def mock_image_provider(self):
|
||||
"""Create a mock ImageProvider that passes isinstance checks."""
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
|
||||
provider = MagicMock(spec=ImageProvider)
|
||||
provider.type = "image"
|
||||
provider.images = ["img1:latest", "img2:latest"]
|
||||
provider.registry = None
|
||||
|
||||
r1 = self._make_report("CVE-2024-0001", "img1:latest")
|
||||
r2 = self._make_report("CVE-2024-0002", "img2:latest")
|
||||
provider.scan_per_image.return_value = iter(
|
||||
[("img1:latest", [r1]), ("img2:latest", [r2])]
|
||||
)
|
||||
return provider
|
||||
|
||||
def test_image_scan_yields_findings(self, mock_image_provider):
|
||||
"""Verify Finding objects are created with correct fields for each image."""
|
||||
scan = Scan(mock_image_provider)
|
||||
results = list(scan.scan({}))
|
||||
|
||||
assert len(results) == 2
|
||||
for _, findings in results:
|
||||
assert len(findings) == 1
|
||||
f = findings[0]
|
||||
assert f.status.name in ("PASS", "FAIL")
|
||||
assert f.auth_method == "Registry"
|
||||
assert f.account_name == "Container Registry"
|
||||
# resource_uid should be image_name:sha
|
||||
assert ":abc123def456" in f.resource_uid
|
||||
# resource_name should be clean image name
|
||||
assert f.resource_name in ("img1:latest", "img2:latest")
|
||||
|
||||
def test_image_scan_progress_tracking(self, mock_image_provider):
|
||||
"""Verify progress increments per image."""
|
||||
scan = Scan(mock_image_provider)
|
||||
results = list(scan.scan({}))
|
||||
|
||||
progresses = [p for p, _ in results]
|
||||
assert len(progresses) == 2
|
||||
assert progresses[0] < progresses[1]
|
||||
|
||||
def test_image_scan_status_filtering(self, mock_image_provider):
|
||||
"""Verify status filter excludes non-matching findings."""
|
||||
scan = Scan(mock_image_provider, status=["PASS"])
|
||||
results = list(scan.scan({}))
|
||||
|
||||
# All reports have status="FAIL", so filtering for PASS should yield empty findings
|
||||
for _, findings in results:
|
||||
assert len(findings) == 0
|
||||
|
||||
@@ -45,8 +45,16 @@ SAMPLE_UNKNOWN_SEVERITY_FINDING = {
|
||||
"Description": "An issue with unknown severity.",
|
||||
}
|
||||
|
||||
# Sample image SHA for testing (first 12 chars of a sha256 digest)
|
||||
SAMPLE_IMAGE_SHA = "c1aabb73d233"
|
||||
SAMPLE_IMAGE_ID = f"sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"
|
||||
|
||||
# Full Trivy JSON output structure with a single vulnerability
|
||||
SAMPLE_TRIVY_IMAGE_OUTPUT = {
|
||||
"Metadata": {
|
||||
"ImageID": SAMPLE_IMAGE_ID,
|
||||
"RepoDigests": [f"alpine@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
|
||||
},
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
@@ -55,11 +63,15 @@ SAMPLE_TRIVY_IMAGE_OUTPUT = {
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
# Full Trivy JSON output with mixed finding types
|
||||
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
|
||||
"Metadata": {
|
||||
"ImageID": SAMPLE_IMAGE_ID,
|
||||
"RepoDigests": [f"myimage@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
|
||||
},
|
||||
"Results": [
|
||||
{
|
||||
"Target": "myimage:latest (debian 12)",
|
||||
@@ -68,7 +80,36 @@ SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
|
||||
"Secrets": [SAMPLE_SECRET_FINDING],
|
||||
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
# Trivy output with only RepoDigests (no ImageID) for fallback testing
|
||||
SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT = {
|
||||
"Metadata": {
|
||||
"RepoDigests": ["alpine@sha256:e5f6g7h8i9j0abcdef1234567890"],
|
||||
},
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
"Type": "alpine",
|
||||
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Trivy output with no Metadata at all
|
||||
SAMPLE_TRIVY_NO_METADATA_OUTPUT = {
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
"Type": "alpine",
|
||||
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@@ -90,3 +131,13 @@ def get_invalid_trivy_output():
|
||||
def get_multi_type_trivy_output():
|
||||
"""Return Trivy output with multiple finding types as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
|
||||
|
||||
|
||||
def get_repo_digest_only_trivy_output():
|
||||
"""Return Trivy output with only RepoDigests (no ImageID) as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT)
|
||||
|
||||
|
||||
def get_no_metadata_trivy_output():
|
||||
"""Return Trivy output with no Metadata as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_NO_METADATA_OUTPUT)
|
||||
|
||||
@@ -20,6 +20,7 @@ from prowler.providers.image.exceptions.exceptions import (
|
||||
)
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
from tests.providers.image.image_fixtures import (
|
||||
SAMPLE_IMAGE_SHA,
|
||||
SAMPLE_MISCONFIGURATION_FINDING,
|
||||
SAMPLE_SECRET_FINDING,
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING,
|
||||
@@ -27,6 +28,8 @@ from tests.providers.image.image_fixtures import (
|
||||
get_empty_trivy_output,
|
||||
get_invalid_trivy_output,
|
||||
get_multi_type_trivy_output,
|
||||
get_no_metadata_trivy_output,
|
||||
get_repo_digest_only_trivy_output,
|
||||
get_sample_trivy_json_output,
|
||||
)
|
||||
|
||||
@@ -42,10 +45,6 @@ def _make_provider(**kwargs):
|
||||
|
||||
|
||||
class TestImageProvider:
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""},
|
||||
)
|
||||
def test_image_provider(self):
|
||||
"""Test default initialization."""
|
||||
provider = _make_provider()
|
||||
@@ -124,22 +123,27 @@ class TestImageProvider:
|
||||
provider = _make_provider()
|
||||
report = provider._process_finding(
|
||||
SAMPLE_VULNERABILITY_FINDING,
|
||||
"alpine:3.18",
|
||||
"alpine:3.18 (alpine 3.18.0)",
|
||||
"alpine",
|
||||
image_sha="c1aabb73d233",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.status == "FAIL"
|
||||
assert report.check_metadata.CheckID == "CVE-2024-1234"
|
||||
assert report.check_metadata.Severity == "high"
|
||||
assert report.check_metadata.ServiceName == "alpine"
|
||||
assert report.check_metadata.ServiceName == "container-image"
|
||||
assert report.check_metadata.ResourceType == "container-image"
|
||||
assert report.check_metadata.ResourceGroup == "container"
|
||||
assert report.package_name == "openssl"
|
||||
assert report.installed_version == "1.1.1k-r0"
|
||||
assert report.fixed_version == "1.1.1l-r0"
|
||||
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
|
||||
assert report.resource_name == "alpine:3.18"
|
||||
assert report.image_sha == "c1aabb73d233"
|
||||
assert report.resource_details == "alpine:3.18 (alpine 3.18.0)"
|
||||
assert report.region == "container"
|
||||
assert report.check_metadata.Categories == ["vulnerability"]
|
||||
assert report.check_metadata.RelatedUrl == ""
|
||||
|
||||
def test_process_finding_secret(self):
|
||||
"""Test processing a secret finding (identified by RuleID)."""
|
||||
@@ -147,14 +151,15 @@ class TestImageProvider:
|
||||
report = provider._process_finding(
|
||||
SAMPLE_SECRET_FINDING,
|
||||
"myimage:latest",
|
||||
"secret",
|
||||
"myimage:latest (debian 12)",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.status == "FAIL"
|
||||
assert report.check_metadata.CheckID == "aws-access-key-id"
|
||||
assert report.check_metadata.Severity == "critical"
|
||||
assert report.check_metadata.ServiceName == "secret"
|
||||
assert report.check_metadata.ServiceName == "container-image"
|
||||
assert report.check_metadata.Categories == ["secrets"]
|
||||
|
||||
def test_process_finding_misconfiguration(self):
|
||||
"""Test processing a misconfiguration finding (identified by ID)."""
|
||||
@@ -162,13 +167,14 @@ class TestImageProvider:
|
||||
report = provider._process_finding(
|
||||
SAMPLE_MISCONFIGURATION_FINDING,
|
||||
"myimage:latest",
|
||||
"misconfiguration",
|
||||
"myimage:latest (debian 12)",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.check_metadata.CheckID == "DS001"
|
||||
assert report.check_metadata.Severity == "medium"
|
||||
assert report.check_metadata.ServiceName == "misconfiguration"
|
||||
assert report.check_metadata.ServiceName == "container-image"
|
||||
assert report.check_metadata.Categories == []
|
||||
|
||||
def test_process_finding_unknown_severity(self):
|
||||
"""Test that UNKNOWN severity is mapped to informational."""
|
||||
@@ -176,7 +182,7 @@ class TestImageProvider:
|
||||
report = provider._process_finding(
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING,
|
||||
"myimage:latest",
|
||||
"alpine",
|
||||
"myimage:latest (alpine 3.18.0)",
|
||||
)
|
||||
|
||||
assert report.check_metadata.Severity == "informational"
|
||||
@@ -195,6 +201,9 @@ class TestImageProvider:
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
|
||||
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
|
||||
assert reports[0].resource_name == "alpine:3.18"
|
||||
assert reports[0].check_metadata.ServiceName == "container-image"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_empty_output(self, mock_subprocess):
|
||||
@@ -394,6 +403,51 @@ class TestImageProvider:
|
||||
for _ in provider._scan_single_image("private/image:latest"):
|
||||
pass
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_sha_extraction_from_image_id(self, mock_subprocess):
|
||||
"""Test that image_sha is extracted from Trivy Metadata.ImageID."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider._scan_single_image("alpine:3.18"):
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_sha_extraction_fallback_to_repo_digests(self, mock_subprocess):
|
||||
"""Test that image_sha falls back to RepoDigests when ImageID is absent."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_repo_digest_only_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider._scan_single_image("alpine:3.18"):
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].image_sha == "e5f6g7h8i9j0"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_sha_extraction_no_metadata(self, mock_subprocess):
|
||||
"""Test that image_sha is empty when no Metadata is present."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_no_metadata_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider._scan_single_image("alpine:3.18"):
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].image_sha == ""
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_propagates_scan_error(self, mock_subprocess):
|
||||
"""Test that run_scan() re-raises ImageScanError instead of swallowing it."""
|
||||
@@ -409,17 +463,14 @@ class TestImageProvider:
|
||||
pass
|
||||
|
||||
|
||||
@patch.dict(
|
||||
os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""}
|
||||
)
|
||||
class TestImageProviderRegistryAuth:
|
||||
def test_no_auth_by_default(self):
|
||||
"""Test that no auth is set when no credentials are provided."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert not provider.registry_username
|
||||
assert not provider.registry_password
|
||||
assert not provider.registry_token
|
||||
assert provider.registry_username is None
|
||||
assert provider.registry_password is None
|
||||
assert provider.registry_token is None
|
||||
assert provider.auth_method == "No auth"
|
||||
|
||||
def test_basic_auth_with_explicit_params(self):
|
||||
@@ -431,7 +482,7 @@ class TestImageProviderRegistryAuth:
|
||||
|
||||
assert provider.registry_username == "myuser"
|
||||
assert provider.registry_password == "mypass"
|
||||
assert provider.auth_method == "Basic auth"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
def test_token_auth_with_explicit_param(self):
|
||||
"""Test token auth via explicit constructor param."""
|
||||
@@ -448,7 +499,7 @@ class TestImageProviderRegistryAuth:
|
||||
registry_token="my-token",
|
||||
)
|
||||
|
||||
assert provider.auth_method == "Basic auth"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
@patch.dict(
|
||||
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
|
||||
@@ -459,7 +510,7 @@ class TestImageProviderRegistryAuth:
|
||||
|
||||
assert provider.registry_username == "envuser"
|
||||
assert provider.registry_password == "envpass"
|
||||
assert provider.auth_method == "Basic auth"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
@patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
|
||||
def test_token_auth_from_env_var(self):
|
||||
@@ -573,7 +624,7 @@ class TestImageProviderRegistryAuth:
|
||||
output = " ".join(
|
||||
str(call.args[0]) for call in mock_print.call_args_list if call.args
|
||||
)
|
||||
assert "Basic auth" in output
|
||||
assert "Docker login" in output
|
||||
|
||||
|
||||
class TestExtractRegistry:
|
||||
@@ -616,120 +667,13 @@ class TestExtractRegistry:
|
||||
assert ImageProvider._extract_registry("nginx") is None
|
||||
|
||||
|
||||
class TestTrivyAuthIntegration:
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):
|
||||
"""Test that run_scan() passes TRIVY_USERNAME/PASSWORD via env when credentials are set."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
# Only trivy calls, no docker login/pull
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
env = calls[0].kwargs.get("env") or calls[0][1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
@patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_no_trivy_auth_without_credentials(self, mock_subprocess):
|
||||
"""Test that run_scan() does NOT set TRIVY_USERNAME/PASSWORD when no credentials."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
class TestCleanup:
|
||||
def test_cleanup_idempotent(self):
|
||||
"""Test cleanup is safe to call multiple times."""
|
||||
provider = _make_provider()
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
|
||||
@patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_token_auth_via_env(self, mock_subprocess):
|
||||
"""Test that run_scan() passes TRIVY_REGISTRY_TOKEN when only token is provided."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(registry_token="my-token")
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
env = calls[0].kwargs.get("env") or calls[0][1].get("env")
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_with_credentials_only_calls_trivy(self, mock_subprocess):
|
||||
"""Test that run() only calls trivy (no docker login/pull/logout)."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
provider.run()
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_multiple_images_all_get_trivy_env(self, mock_subprocess):
|
||||
"""Test that all trivy calls get TRIVY_USERNAME/PASSWORD when scanning multiple images."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
trivy_calls = [c for c in calls if c.args[0][0] == "trivy"]
|
||||
assert len(trivy_calls) == 2
|
||||
for call in trivy_calls:
|
||||
env = call.kwargs.get("env") or call[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_docker_hub_uses_trivy_auth(self, mock_subprocess):
|
||||
"""Test test_connection passes TRIVY creds for Docker Hub images."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="andoniaf/test-private:tag",
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
assert mock_subprocess.call_count == 1
|
||||
trivy_call = mock_subprocess.call_args
|
||||
assert trivy_call.args[0][0] == "trivy"
|
||||
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
provider.cleanup()
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
class TestImageProviderInputValidation:
|
||||
@@ -921,3 +865,67 @@ class TestImageProviderNameValidation:
|
||||
|
||||
with pytest.raises(ImageListFileReadError):
|
||||
_make_provider(images=None, image_list_file=file_path)
|
||||
|
||||
|
||||
class TestScanPerImage:
|
||||
@patch("subprocess.run")
|
||||
def test_yields_per_image(self, mock_subprocess):
|
||||
"""Test that scan_per_image yields (name, findings) per image."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
|
||||
|
||||
results = list(provider.scan_per_image())
|
||||
|
||||
assert len(results) == 2
|
||||
for name, findings in results:
|
||||
assert isinstance(name, str)
|
||||
assert isinstance(findings, list)
|
||||
assert all(isinstance(f, CheckReportImage) for f in findings)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_reraises_scan_error(self, mock_subprocess):
|
||||
"""Test that ImageScanError propagates from scan_per_image."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1, stdout="", stderr="scan failed"
|
||||
)
|
||||
provider = _make_provider(images=["alpine:3.18"])
|
||||
|
||||
with pytest.raises(ImageScanError):
|
||||
list(provider.scan_per_image())
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_skips_generic_error(self, mock_subprocess):
|
||||
"""Test that a generic RuntimeError in _scan_single_image yields empty findings and continues."""
|
||||
|
||||
def side_effect(cmd, **kwargs):
|
||||
if "bad:image" in cmd:
|
||||
raise RuntimeError("unexpected error")
|
||||
return MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
mock_subprocess.side_effect = side_effect
|
||||
provider = _make_provider(images=["bad:image", "alpine:3.18"])
|
||||
|
||||
results = list(provider.scan_per_image())
|
||||
|
||||
assert len(results) == 2
|
||||
assert results[0][0] == "bad:image"
|
||||
assert results[0][1] == []
|
||||
assert results[1][0] == "alpine:3.18"
|
||||
assert len(results[1][1]) > 0
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_calls_cleanup(self, mock_subprocess):
|
||||
"""Test that cleanup is called even after scan_per_image completes."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(images=["alpine:3.18"])
|
||||
|
||||
with mock.patch.object(provider, "cleanup") as mock_cleanup:
|
||||
list(provider.scan_per_image())
|
||||
|
||||
mock_cleanup.assert_called_once()
|
||||
|
||||
@@ -152,16 +152,15 @@ class TestEmptyRegistry:
|
||||
|
||||
class TestRegistryList:
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_registry_list_prints_and_exits(self, mock_factory, capsys):
|
||||
def test_registry_list_prints_and_returns(self, mock_factory, capsys):
|
||||
adapter = MagicMock()
|
||||
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
|
||||
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True)
|
||||
provider = _build_provider(registry_list_images=True)
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "app/frontend" in captured.out
|
||||
assert "app/backend" in captured.out
|
||||
@@ -177,10 +176,9 @@ class TestRegistryList:
|
||||
adapter.list_tags.return_value = ["latest"]
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True, image_filter="^prod/")
|
||||
provider = _build_provider(registry_list_images=True, image_filter="^prod/")
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "prod/app" in captured.out
|
||||
assert "dev/app" not in captured.out
|
||||
@@ -193,10 +191,9 @@ class TestRegistryList:
|
||||
adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
|
||||
provider = _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "v1.0" in captured.out
|
||||
assert "dev-abc" not in captured.out
|
||||
@@ -210,10 +207,9 @@ class TestRegistryList:
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
# max_images=1 would normally raise, but --registry-list skips it
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True, max_images=1)
|
||||
provider = _build_provider(registry_list_images=True, max_images=1)
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "6 images" in captured.out
|
||||
|
||||
|
||||
Reference in New Issue
Block a user