Compare commits

...

24 Commits

Author SHA1 Message Date
Andoni A.
d03ec8ad09 style(ui): reorder ternary chain in formSchemas to minimize diff against master
Move image and openstack branches to the end of the credentials
ternary chain so oraclecloud, mongodbatlas, and alibabacloud retain
their original indentation from master.
2026-02-24 13:14:26 +01:00
Andoni A.
ea96ad0fd8 fix(ui): restore alibabacloud credential validation in form schema
The alibabacloud branch was accidentally dropped from the credentials
ternary chain when image and openstack providers were added.
2026-02-24 12:45:46 +01:00
Andoni A.
4443fa231a style(ui): change image provider icon background to black 2026-02-23 13:34:56 +01:00
Andoni A.
215e4ce704 fix(ui): add self-contained background to image provider icon
- Add orange rounded-rect background matching other provider badges
- Convert strokes to white for consistent dark mode visibility
2026-02-23 13:22:44 +01:00
Andoni A.
74181346d3 style(ui): use named React import in image-provider-badge 2026-02-23 12:17:31 +01:00
Andoni A.
4450fb45f6 fix(ui): fix image provider filter icon and React import 2026-02-23 11:41:52 +01:00
Andoni A.
1ec79831d7 feat(ui): add image provider support to UI 2026-02-23 11:39:16 +01:00
Andoni A.
18bac5a890 fix: address PR review comments for Image provider
- Extract _is_registry_url() into ImageProvider to deduplicate logic
- Restore relative URL handling in _next_page_url for path-only Link headers
- Yield empty findings on generic error in scan_per_image
- Add credential validation to ImageProviderSecret serializer
2026-02-23 08:04:51 +01:00
Andoni A.
f5f91f85d4 style(api): fix ruff formatting in test_utils.py 2026-02-20 13:40:19 +01:00
Andoni A.
42729bf009 chore(api): revert poetry.lock to master 2026-02-20 13:36:14 +01:00
Andoni A.
384f00e057 chore(api): point prowler dependency back to master 2026-02-20 13:23:08 +01:00
Andoni A.
09deb38585 docs(api): update changelog PR reference for Image provider 2026-02-20 13:20:36 +01:00
Andoni A.
04ac302af2 Merge branch 'master' into feat/PROWLER-940-stage-2-a-image-provider-api-v3 2026-02-20 12:43:26 +01:00
Andoni A.
fbcc8a3139 fix(api): add Image provider test coverage and mutelist exclusion
Add missing API tests for Image provider in get_prowler_provider_kwargs
and prowler_provider_connection_test. Fix mutelist exclusion to also
skip Image provider (uses Trivy's built-in logic, same as IaC). Add
API CHANGELOG entry for image provider support.
2026-02-20 11:16:46 +01:00
Andoni A.
c81239e00b feat(sdk): add Image provider HTML assessment summary
- Add get_image_assessment_summary static method to HTML class
- Support registry URL and image list display modes
- Add tests for both registry and image list scenarios
2026-02-20 10:23:16 +01:00
Andoni A.
0d184b28ff fix(api): detect registry URL vs image reference for Image provider
The Image provider UID was always passed as images=[uid], causing
registry URLs like docker.io/andoniaf to be treated as a single
image. Reuse the same heuristic from test_connection to pass
registry URLs as registry= instead, enabling image enumeration.
2026-02-20 09:09:29 +01:00
Andoni A.
c1c33cc6d8 feat(sdk): add Image provider scan support to Scan class
- Add IMAGE provider guard clauses alongside IaC in Scan init
- Add scan_per_image() execution block with per-image progress tracking
- Bypass check loading for IMAGE provider in checks_loader
- Use image_sha for resource_uid in Finding output
- Add IMAGE entry to API compliance class map
2026-02-20 08:51:28 +01:00
Andoni A.
b13c4fa1b2 fix(sdk): handle registry URL in Image provider connection test
- Detect registry URLs (e.g. docker.io/namespace) vs image references
- Use list_repositories for registry URLs, list_tags for images
- Add test for registry URL connection test
2026-02-20 08:24:59 +01:00
Andoni A.
57ed5a701d fix(sdk): use registry HTTP check for Image provider connection test
- Replace Trivy subprocess with registry adapter for connection testing
- Avoid false auth failures caused by Trivy DB download errors
- Update tests to mock registry adapters instead of subprocess
2026-02-19 18:24:37 +01:00
Andoni A.
6d1ee5e6d0 fix(sdk): allow Trivy DB download in Image provider connection test
- Remove invalid --scanners none flag
- Let Trivy download vulnerability DB on first run
2026-02-19 17:31:12 +01:00
Andoni A.
30533d9a29 fix(sdk): use scanners none for Image provider connection test
- Replace --skip-db-update with --scanners none in test_connection
- Avoids Trivy fatal error when vulnerability DB is not yet downloaded
2026-02-19 17:15:03 +01:00
Andoni A.
ce6d94a32c feat(api): integrate Image provider into API layer
- Add IMAGE to ProviderChoices and validate_image_uid
- Add ImageProviderSecret serializer for registry credentials
- Wire Image provider into connection test and scan initialization
- Rebase migration to 0081 after master merge conflict
2026-02-19 17:09:44 +01:00
Andoni A.
4f8a94f136 Merge remote-tracking branch 'origin/master' into feat/PROWLER-940-stage-2-a-image-provider-api-v3
# Conflicts:
#	poetry.lock
#	prowler/providers/image/exceptions/exceptions.py
#	prowler/providers/image/image_provider.py
#	prowler/providers/image/lib/arguments/arguments.py
#	tests/providers/image/image_fixtures.py
#	tests/providers/image/image_provider_test.py
2026-02-19 16:29:59 +01:00
Andoni A.
2fa4faa7f6 feat: add Image provider support with Trivy-based container scanning
- Add Image provider SDK with registry adapters (Docker Hub, OCI)
- Integrate Image scan path in Scan class alongside IaC provider
- Add CheckReportImage model and Finding output block for image results
- Add IMAGE enum, UID validation, and migration to API models
- Skip compliance and attack surface tasks for Image/IaC providers
- Bump Trivy to v0.69.1 and allow hyphenated CheckIDs for image provider
2026-02-19 15:56:47 +01:00
43 changed files with 1385 additions and 314 deletions

View File

@@ -6,6 +6,7 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- `image` provider support for container image scanning [(#10128)](https://github.com/prowler-cloud/prowler/pull/10128)
- OpenStack provider support [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003)
- PDF report for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)

View File

@@ -5,7 +5,7 @@ LABEL maintainer="https://github.com/prowler-cloud/api"
ARG POWERSHELL_VERSION=7.5.0
ENV POWERSHELL_VERSION=${POWERSHELL_VERSION}
ARG TRIVY_VERSION=0.66.0
ARG TRIVY_VERSION=0.69.1
ENV TRIVY_VERSION=${TRIVY_VERSION}
# hadolint ignore=DL3008

View File

@@ -0,0 +1,38 @@
from django.db import migrations
import api.db_utils
class Migration(migrations.Migration):
dependencies = [
("api", "0080_backfill_attack_paths_graph_data_ready"),
]
operations = [
migrations.AlterField(
model_name="provider",
name="provider",
field=api.db_utils.ProviderEnumField(
choices=[
("aws", "AWS"),
("azure", "Azure"),
("gcp", "GCP"),
("kubernetes", "Kubernetes"),
("m365", "M365"),
("github", "GitHub"),
("mongodbatlas", "MongoDB Atlas"),
("iac", "IaC"),
("oraclecloud", "Oracle Cloud Infrastructure"),
("alibabacloud", "Alibaba Cloud"),
("cloudflare", "Cloudflare"),
("openstack", "OpenStack"),
("image", "Image"),
],
default="aws",
),
),
migrations.RunSQL(
"ALTER TYPE provider ADD VALUE IF NOT EXISTS 'image';",
reverse_sql=migrations.RunSQL.noop,
),
]

View File

@@ -289,6 +289,7 @@ class Provider(RowLevelSecurityProtectedModel):
ALIBABACLOUD = "alibabacloud", _("Alibaba Cloud")
CLOUDFLARE = "cloudflare", _("Cloudflare")
OPENSTACK = "openstack", _("OpenStack")
IMAGE = "image", _("Image")
@staticmethod
def validate_aws_uid(value):
@@ -423,6 +424,15 @@ class Provider(RowLevelSecurityProtectedModel):
pointer="/data/attributes/uid",
)
@staticmethod
def validate_image_uid(value):
if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9._/:@-]{2,249}$", value):
raise ModelValidationError(
detail="Image provider ID must be a valid container image reference.",
code="image-uid",
pointer="/data/attributes/uid",
)
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
inserted_at = models.DateTimeField(auto_now_add=True, editable=False)
updated_at = models.DateTimeField(auto_now=True, editable=False)

View File

@@ -2,6 +2,7 @@ import pytest
from rest_framework.exceptions import ValidationError
from api.v1.serializer_utils.integrations import S3ConfigSerializer
from api.v1.serializers import ImageProviderSecret
class TestS3ConfigSerializer:
@@ -98,3 +99,37 @@ class TestS3ConfigSerializer:
serializer = S3ConfigSerializer(data=data)
assert not serializer.is_valid()
assert "output_directory" in serializer.errors
class TestImageProviderSecret:
"""Test cases for ImageProviderSecret validation."""
def test_valid_no_credentials(self):
serializer = ImageProviderSecret(data={})
assert serializer.is_valid()
def test_valid_token_only(self):
serializer = ImageProviderSecret(data={"registry_token": "tok"})
assert serializer.is_valid()
def test_valid_username_and_password(self):
serializer = ImageProviderSecret(
data={"registry_username": "user", "registry_password": "pass"}
)
assert serializer.is_valid()
def test_valid_token_with_username_only(self):
serializer = ImageProviderSecret(
data={"registry_token": "tok", "registry_username": "user"}
)
assert serializer.is_valid()
def test_invalid_username_without_password(self):
serializer = ImageProviderSecret(data={"registry_username": "user"})
assert not serializer.is_valid()
assert "non_field_errors" in serializer.errors
def test_invalid_password_without_username(self):
serializer = ImageProviderSecret(data={"registry_password": "pass"})
assert not serializer.is_valid()
assert "non_field_errors" in serializer.errors

View File

@@ -24,6 +24,7 @@ from prowler.providers.cloudflare.cloudflare_provider import CloudflareProvider
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
@@ -122,6 +123,7 @@ class TestReturnProwlerProvider:
(Provider.ProviderChoices.ALIBABACLOUD.value, AlibabacloudProvider),
(Provider.ProviderChoices.CLOUDFLARE.value, CloudflareProvider),
(Provider.ProviderChoices.OPENSTACK.value, OpenstackProvider),
(Provider.ProviderChoices.IMAGE.value, ImageProvider),
],
)
def test_return_prowler_provider(self, provider_type, expected_provider):
@@ -188,6 +190,47 @@ class TestProwlerProviderConnectionTest:
assert isinstance(connection.error, Provider.secret.RelatedObjectDoesNotExist)
assert str(connection.error) == "Provider has no secret."
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_image_provider(
self, mock_return_prowler_provider
):
"""Test connection test for Image provider with credentials."""
provider = MagicMock()
provider.uid = "docker.io/myns/myimage:latest"
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret.secret = {
"registry_username": "user",
"registry_password": "pass",
"registry_token": "tok123",
}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
image="docker.io/myns/myimage:latest",
raise_on_exception=False,
registry_username="user",
registry_password="pass",
registry_token="tok123",
)
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_image_provider_no_creds(
self, mock_return_prowler_provider
):
"""Test connection test for Image provider without credentials."""
provider = MagicMock()
provider.uid = "alpine:3.18"
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret.secret = {}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
image="alpine:3.18",
raise_on_exception=False,
)
class TestGetProwlerProviderKwargs:
@pytest.mark.parametrize(
@@ -336,6 +379,123 @@ class TestGetProwlerProviderKwargs:
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_registry_url(self):
"""Test that Image provider with a registry URL gets 'registry' kwarg."""
provider_uid = "docker.io/myns"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {
"registry": provider_uid,
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_image_ref(self):
"""Test that Image provider with a full image reference gets 'images' kwarg."""
provider_uid = "docker.io/myns/myimage:latest"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {
"images": [provider_uid],
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_dockerhub_image(self):
"""Test that Image provider with a short DockerHub image gets 'images' kwarg."""
provider_uid = "alpine:3.18"
secret_dict = {}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {"images": [provider_uid]}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_filters_falsy_secrets(self):
"""Test that falsy secret values are filtered out for Image provider."""
provider_uid = "docker.io/myns/myimage:latest"
secret_dict = {
"registry_username": "",
"registry_password": "",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider)
expected_result = {"images": [provider_uid]}
assert result == expected_result
def test_get_prowler_provider_kwargs_image_provider_ignores_mutelist(self):
"""Test that Image provider does NOT receive mutelist_content.
Image provider uses Trivy's built-in mutelist logic, so it should not
receive mutelist_content even when a mutelist processor is configured.
"""
provider_uid = "docker.io/myns/myimage:latest"
secret_dict = {
"registry_username": "user",
"registry_password": "pass",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
mutelist_processor = MagicMock()
mutelist_processor.configuration = {"Mutelist": {"key": "value"}}
provider = MagicMock()
provider.provider = Provider.ProviderChoices.IMAGE.value
provider.secret = secret_mock
provider.uid = provider_uid
result = get_prowler_provider_kwargs(provider, mutelist_processor)
assert "mutelist_content" not in result
expected_result = {
"images": [provider_uid],
"registry_username": "user",
"registry_password": "pass",
}
assert result == expected_result
def test_get_prowler_provider_kwargs_unsupported_provider(self):
# Setup
provider_uid = "provider_uid"

View File

@@ -28,6 +28,7 @@ if TYPE_CHECKING:
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.github.github_provider import GithubProvider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.mongodbatlas.mongodbatlas_provider import (
@@ -83,6 +84,7 @@ def return_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
| ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -95,7 +97,7 @@ def return_prowler_provider(
provider (Provider): The provider object containing the provider type and associated secrets.
Returns:
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: The corresponding provider class.
Raises:
ValueError: If the provider type specified in `provider.provider` is not supported.
@@ -159,6 +161,10 @@ def return_prowler_provider(
from prowler.providers.openstack.openstack_provider import OpenstackProvider
prowler_provider = OpenstackProvider
case Provider.ProviderChoices.IMAGE.value:
from prowler.providers.image.image_provider import ImageProvider
prowler_provider = ImageProvider
case _:
raise ValueError(f"Provider type {provider.provider} not supported")
return prowler_provider
@@ -221,11 +227,29 @@ def get_prowler_provider_kwargs(
# clouds.yaml is not feasible because not all auth methods include it and the
# Keystone API is unavailable on public clouds.
pass
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
# Detect whether uid is a registry URL (e.g. "docker.io/andoniaf") or
# a concrete image reference (e.g. "docker.io/andoniaf/myimage:latest").
from prowler.providers.image.image_provider import ImageProvider
if ImageProvider._is_registry_url(provider.uid):
prowler_provider_kwargs = {
"registry": provider.uid,
**{k: v for k, v in prowler_provider_kwargs.items() if v},
}
else:
prowler_provider_kwargs = {
"images": [provider.uid],
**{k: v for k, v in prowler_provider_kwargs.items() if v},
}
if mutelist_processor:
mutelist_content = mutelist_processor.configuration.get("Mutelist", {})
# IaC provider doesn't support mutelist (uses Trivy's built-in logic)
if mutelist_content and provider.provider != Provider.ProviderChoices.IAC.value:
# IaC and Image providers don't support mutelist (both use Trivy's built-in logic)
if mutelist_content and provider.provider not in (
Provider.ProviderChoices.IAC.value,
Provider.ProviderChoices.IMAGE.value,
):
prowler_provider_kwargs["mutelist_content"] = mutelist_content
return prowler_provider_kwargs
@@ -242,6 +266,7 @@ def initialize_prowler_provider(
| GcpProvider
| GithubProvider
| IacProvider
| ImageProvider
| KubernetesProvider
| M365Provider
| MongodbatlasProvider
@@ -255,7 +280,7 @@ def initialize_prowler_provider(
mutelist_processor (Processor): The mutelist processor object containing the mutelist configuration.
Returns:
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
AlibabacloudProvider | AwsProvider | AzureProvider | CloudflareProvider | GcpProvider | GithubProvider | IacProvider | ImageProvider | KubernetesProvider | M365Provider | MongodbatlasProvider | OpenstackProvider | OraclecloudProvider: An instance of the corresponding provider class
initialized with the provider's secrets.
"""
prowler_provider = return_prowler_provider(provider)
@@ -297,6 +322,22 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
"raise_on_exception": False,
}
return prowler_provider.test_connection(**openstack_kwargs)
elif provider.provider == Provider.ProviderChoices.IMAGE.value:
image_kwargs = {
"image": provider.uid,
"raise_on_exception": False,
}
if prowler_provider_kwargs.get("registry_username"):
image_kwargs["registry_username"] = prowler_provider_kwargs[
"registry_username"
]
if prowler_provider_kwargs.get("registry_password"):
image_kwargs["registry_password"] = prowler_provider_kwargs[
"registry_password"
]
if prowler_provider_kwargs.get("registry_token"):
image_kwargs["registry_token"] = prowler_provider_kwargs["registry_token"]
return prowler_provider.test_connection(**image_kwargs)
else:
return prowler_provider.test_connection(
**prowler_provider_kwargs,

View File

@@ -1528,6 +1528,8 @@ class BaseWriteProviderSecretSerializer(BaseWriteSerializer):
)
elif provider_type == Provider.ProviderChoices.OPENSTACK.value:
serializer = OpenStackCloudsYamlProviderSecret(data=secret)
elif provider_type == Provider.ProviderChoices.IMAGE.value:
serializer = ImageProviderSecret(data=secret)
else:
raise serializers.ValidationError(
{"provider": f"Provider type not supported {provider_type}"}
@@ -1702,6 +1704,30 @@ class OpenStackCloudsYamlProviderSecret(serializers.Serializer):
resource_name = "provider-secrets"
class ImageProviderSecret(serializers.Serializer):
registry_username = serializers.CharField(required=False)
registry_password = serializers.CharField(required=False)
registry_token = serializers.CharField(required=False)
class Meta:
resource_name = "provider-secrets"
def validate(self, attrs):
token = attrs.get("registry_token")
username = attrs.get("registry_username")
password = attrs.get("registry_password")
if not token:
if username and not password:
raise serializers.ValidationError(
"registry_password is required when registry_username is provided."
)
if password and not username:
raise serializers.ValidationError(
"registry_username is required when registry_password is provided."
)
return attrs
class AlibabaCloudProviderSecret(serializers.Serializer):
access_key_id = serializers.CharField()
access_key_secret = serializers.CharField()

View File

@@ -137,6 +137,7 @@ COMPLIANCE_CLASS_MAP = {
# IaC provider doesn't have specific compliance frameworks yet
# Trivy handles its own compliance checks
],
"image": [],
"oraclecloud": [
(lambda name: name.startswith("cis_"), OracleCloudCIS),
(lambda name: name.startswith("csa_"), OracleCloudCSA),

4
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -4874,7 +4874,7 @@ description = "C parser in Python"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
markers = "implementation_name != \"PyPy\" and platform_python_implementation != \"PyPy\""
markers = "platform_python_implementation != \"PyPy\" and implementation_name != \"PyPy\""
files = [
{file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"},
{file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"},

View File

@@ -22,8 +22,8 @@ def load_checks_to_execute(
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
# Bypass check loading for IAC provider since it uses Trivy directly
if provider == "iac":
# Bypass check loading for providers that use Trivy directly
if provider in ("iac", "image"):
return set()
# Local subsets

View File

@@ -384,10 +384,12 @@ class Finding(BaseModel):
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = "image"
output_data["account_name"] = "image"
output_data["resource_name"] = getattr(
check_output, "resource_name", ""
image_name = getattr(check_output, "resource_name", "")
image_sha = getattr(check_output, "image_sha", "")
output_data["resource_name"] = image_name
output_data["resource_uid"] = (
f"{image_name}:{image_sha}" if image_sha else image_name
)
output_data["resource_uid"] = getattr(check_output, "resource_id", "")
output_data["region"] = getattr(check_output, "region", "container")
output_data["package_name"] = getattr(check_output, "package_name", "")
output_data["installed_version"] = getattr(

View File

@@ -930,6 +930,56 @@ class HTML(Output):
)
return ""
@staticmethod
def get_image_assessment_summary(provider: Provider) -> str:
"""
get_image_assessment_summary gets the HTML assessment summary for the Image provider
Args:
provider (Provider): the Image provider object
Returns:
str: the HTML assessment summary
"""
try:
if provider.registry:
target_info = f"<b>Registry URL:</b> {provider.registry}"
else:
target_info = f'<b>Images:</b> {", ".join(provider.images)}'
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Image Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
{target_info}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Image Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Image authentication method:</b> {provider.auth_method}
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_llm_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -27,6 +27,7 @@ from prowler.lib.scan.exceptions.exceptions import (
from prowler.providers.common.models import Audit_Metadata, ProviderOutputOptions
from prowler.providers.common.provider import Provider
from prowler.providers.iac.iac_provider import IacProvider
from prowler.providers.image.image_provider import ImageProvider
class Scan:
@@ -92,10 +93,10 @@ class Scan:
except ValueError:
raise ScanInvalidStatusError(f"Invalid status provided: {s}.")
# Special setup for IaC provider - override inputs to work with traditional flow
if provider.type == "iac":
# IaC doesn't use traditional Prowler checks, so clear all input parameters
# to avoid validation errors and let it flow through the normal logic
# Special setup for IaC/Image providers - override inputs to work with traditional flow
if provider.type in ("iac", "image"):
# These providers don't use traditional Prowler checks, so clear all input parameters
# to avoid validation errors and let them flow through the normal logic
checks = None
services = None
excluded_checks = None
@@ -160,8 +161,8 @@ class Scan:
)
# Load checks to execute
if provider.type == "iac":
self._checks_to_execute = ["iac_scan"] # Dummy check name for IaC
if provider.type in ("iac", "image"):
self._checks_to_execute = [f"{provider.type}_scan"]
else:
self._checks_to_execute = sorted(
load_checks_to_execute(
@@ -200,8 +201,8 @@ class Scan:
self._number_of_checks_to_execute = len(self._checks_to_execute)
# Set up service-based checks tracking
if provider.type == "iac":
service_checks_to_execute = {"iac": set(["iac_scan"])}
if provider.type in ("iac", "image"):
service_checks_to_execute = {provider.type: set([f"{provider.type}_scan"])}
else:
service_checks_to_execute = get_service_checks_to_execute(
self._checks_to_execute
@@ -346,6 +347,75 @@ class Scan:
self._duration = int((end_time - start_time).total_seconds())
return
# Special handling for Image provider
elif self._provider.type == "image":
if isinstance(self._provider, ImageProvider):
logger.info("Running Image scan with Trivy...")
total_images = len(self._provider.images)
images_completed = 0
for image_name, image_findings in self._provider.scan_per_image():
findings = []
for report in image_findings:
finding_uid = f"{report.check_metadata.CheckID}-{report.resource_name}-{report.resource_id}"
status_enum = (
Status.FAIL if report.status == "FAIL" else Status.PASS
)
if report.muted:
status_enum = Status.MUTED
image_sha = getattr(report, "image_sha", "")
resource_uid = (
f"{image_name}:{image_sha}" if image_sha else image_name
)
finding = Finding(
auth_method="Registry",
timestamp=datetime.datetime.now(timezone.utc),
account_uid=getattr(self._provider, "registry", None)
or "image",
account_name="Container Registry",
metadata=report.check_metadata,
uid=finding_uid,
status=status_enum,
status_extended=report.status_extended,
muted=report.muted,
resource_uid=resource_uid,
resource_metadata=report.resource,
resource_name=image_name,
resource_details=report.resource_details,
resource_tags={},
region=report.region,
compliance={},
raw=report.resource,
)
findings.append(finding)
# Filter the findings by the status
if self._status:
findings = [f for f in findings if f.status in self._status]
images_completed += 1
progress = (
images_completed / total_images * 100
if total_images > 0
else 100.0
)
yield (progress, findings)
# Update progress
self._number_of_checks_completed = 1
self._number_of_checks_to_execute = 1
# Calculate duration
end_time = datetime.datetime.now()
self._duration = int((end_time - start_time).total_seconds())
return
for check_name in checks_to_execute:
try:
# Recover service from check name

View File

@@ -285,19 +285,6 @@ class Provider(ABC):
timeout=arguments.timeout,
config_path=arguments.config_file,
fixer_config=fixer_config,
registry_username=getattr(arguments, "registry_username", None),
registry_password=getattr(arguments, "registry_password", None),
registry_token=getattr(arguments, "registry_token", None),
registry=getattr(arguments, "registry", None),
image_filter=getattr(arguments, "image_filter", None),
tag_filter=getattr(arguments, "tag_filter", None),
max_images=getattr(arguments, "max_images", 0),
registry_insecure=getattr(
arguments, "registry_insecure", False
),
registry_list_images=getattr(
arguments, "registry_list_images", False
),
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(

View File

@@ -31,6 +31,9 @@ from prowler.providers.image.exceptions.exceptions import (
ImageListFileReadError,
ImageMaxImagesExceededError,
ImageNoImagesProvidedError,
ImageRegistryAuthError,
ImageRegistryCatalogError,
ImageRegistryNetworkError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
@@ -96,6 +99,7 @@ class ImageProvider(Provider):
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
self._listing_only = False
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
self.registry_username = registry_username or os.environ.get(
@@ -107,8 +111,8 @@ class ImageProvider(Provider):
self.registry_token = registry_token or os.environ.get("REGISTRY_TOKEN")
if self.registry_username and self.registry_password:
self._auth_method = "Basic auth"
logger.info("Using basic auth for registry authentication")
self._auth_method = "Docker login"
logger.info("Using docker login for registry authentication")
elif self.registry_token:
self._auth_method = "Registry token"
logger.info("Using registry token for authentication")
@@ -152,6 +156,8 @@ class ImageProvider(Provider):
# Registry scan mode: enumerate images from registry
if self.registry:
self._enumerate_registry()
if self._listing_only:
return
for image in self.images:
self._validate_image_name(image)
@@ -319,40 +325,61 @@ class ImageProvider(Provider):
return parts[0]
return None
@staticmethod
def _is_registry_url(image_uid: str) -> bool:
"""Determine whether an image UID is a registry URL (namespace only).
A registry URL like ``docker.io/andoniaf`` has a registry host but
the remaining part contains no ``/`` (no repo) and no ``:`` (no tag).
"""
registry_host = ImageProvider._extract_registry(image_uid)
if not registry_host:
return False
repo_and_tag = image_uid[len(registry_host) + 1 :]
return "/" not in repo_and_tag and ":" not in repo_and_tag
def cleanup(self) -> None:
"""Clean up any resources after scanning."""
def _process_finding(
self, finding: dict, image_name: str, finding_type: str
self,
finding: dict,
image: str,
trivy_target: str,
image_sha: str = "",
) -> CheckReportImage:
"""
Process a single finding and create a CheckReportImage object.
Args:
finding: The finding object from Trivy output
image_name: The container image name being scanned
finding_type: The type of finding (Vulnerability, Secret, etc.)
image: The clean container image name (e.g., "alpine:3.18")
trivy_target: The Trivy target string (e.g., "alpine:3.18 (alpine 3.18.0)")
image_sha: Short SHA from Trivy Metadata.ImageID for resource uniqueness
Returns:
CheckReportImage: The processed check report
"""
try:
# Determine finding ID based on type
# Determine finding ID and category based on type
if "VulnerabilityID" in finding:
finding_id = finding["VulnerabilityID"]
finding_description = finding.get(
"Description", finding.get("Title", "")
)
finding_status = "FAIL"
finding_categories = ["vulnerability"]
elif "RuleID" in finding:
# Secret finding
finding_id = finding["RuleID"]
finding_description = finding.get("Title", "Secret detected")
finding_status = "FAIL"
finding_categories = ["secrets"]
else:
finding_id = finding.get("ID", "UNKNOWN")
finding_description = finding.get("Description", "")
finding_status = finding.get("Status", "FAIL")
finding_categories = []
# Build remediation text for vulnerabilities
remediation_text = ""
@@ -371,7 +398,7 @@ class ImageProvider(Provider):
"CheckID": finding_id,
"CheckTitle": finding.get("Title", finding_id),
"CheckType": ["Container Image Security"],
"ServiceName": finding_type,
"ServiceName": "container-image",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": trivy_severity,
@@ -381,7 +408,7 @@ class ImageProvider(Provider):
"Risk": finding.get(
"Description", "Vulnerability detected in container image"
),
"RelatedUrl": finding.get("PrimaryURL", ""),
"RelatedUrl": "",
"Remediation": {
"Code": {
"NativeIaC": "",
@@ -394,7 +421,7 @@ class ImageProvider(Provider):
"Url": finding.get("PrimaryURL", ""),
},
},
"Categories": [],
"Categories": finding_categories,
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
@@ -404,11 +431,13 @@ class ImageProvider(Provider):
metadata = json.dumps(metadata_dict)
report = CheckReportImage(
metadata=metadata, finding=finding, image_name=image_name
metadata=metadata, finding=finding, image_name=image
)
report.status = finding_status
report.status_extended = self._build_status_extended(finding)
report.region = self.region
report.image_sha = image_sha
report.resource_details = trivy_target
return report
except Exception as error:
@@ -453,6 +482,29 @@ class ImageProvider(Provider):
finally:
self.cleanup()
def scan_per_image(
self,
) -> Generator[tuple[str, list[CheckReportImage]], None, None]:
"""Scan images one by one, yielding (image_name, findings) per image.
Unlike run() which returns all findings at once, this method yields
after each image completes, enabling progress tracking.
"""
try:
for image in self.images:
try:
image_findings = []
for batch in self._scan_single_image(image):
image_findings.extend(batch)
yield (image, image_findings)
except (ImageScanError, ImageTrivyBinaryNotFoundError):
raise
except Exception as error:
logger.error(f"Error scanning image {image}: {error}")
yield (image, [])
finally:
self.cleanup()
def run_scan(self) -> Generator[list[CheckReportImage], None, None]:
"""
Run Trivy scan on all configured images.
@@ -534,6 +586,19 @@ class ImageProvider(Provider):
logger.info(f"No findings for image: {image}")
return
# Extract image digest for resource uniqueness
trivy_metadata = output.get("Metadata", {})
image_id = trivy_metadata.get("ImageID", "")
if not image_id:
repo_digests = trivy_metadata.get("RepoDigests", [])
if repo_digests:
image_id = (
repo_digests[0].split("@")[-1]
if "@" in repo_digests[0]
else ""
)
short_sha = image_id.replace("sha256:", "")[:12] if image_id else ""
except json.JSONDecodeError as error:
logger.error(f"Failed to parse Trivy output for {image}: {error}")
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
@@ -544,11 +609,12 @@ class ImageProvider(Provider):
for result in results:
target = result.get("Target", image)
result_type = result.get("Type", "unknown")
# Process Vulnerabilities
for vuln in result.get("Vulnerabilities", []):
report = self._process_finding(vuln, target, result_type)
report = self._process_finding(
vuln, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -556,7 +622,9 @@ class ImageProvider(Provider):
# Process Secrets
for secret in result.get("Secrets", []):
report = self._process_finding(secret, target, "secret")
report = self._process_finding(
secret, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
yield batch
@@ -565,7 +633,7 @@ class ImageProvider(Provider):
# Process Misconfigurations (from Dockerfile)
for misconfig in result.get("Misconfigurations", []):
report = self._process_finding(
misconfig, target, "misconfiguration"
misconfig, image, target, image_sha=short_sha
)
batch.append(report)
if len(batch) >= self.FINDING_BATCH_SIZE:
@@ -679,7 +747,7 @@ class ImageProvider(Provider):
lower = error_msg.lower()
if any(kw in lower for kw in ("401", "403", "unauthorized", "denied")):
return f"Auth failure — check registry credentials: {error_msg}"
return f"Auth failure — check `docker login`: {error_msg}"
if any(kw in lower for kw in ("404", "manifest unknown", "not found")):
return f"Image not found — check name/tag/registry: {error_msg}"
if any(kw in lower for kw in ("429", "rate limit", "too many requests")):
@@ -747,10 +815,11 @@ class ImageProvider(Provider):
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
# Registry list mode: print listing and exit
# Registry list mode: print listing and return early
if self.registry_list_images:
self._print_registry_listing(repos_tags, len(discovered_images))
raise SystemExit(0)
self._listing_only = True
return
# Check max-images limit
if self.max_images and len(discovered_images) > self.max_images:
@@ -848,10 +917,19 @@ class ImageProvider(Provider):
registry_token: str | None = None,
) -> "Connection":
"""
Test connection to container registry by attempting to inspect an image.
Test connection to container registry by verifying image accessibility.
Handles two cases:
- Image reference (e.g. ``alpine:3.18``, ``ghcr.io/user/repo:tag``):
verifies the specific tag exists.
- Registry URL (e.g. ``docker.io/namespace``, ``ghcr.io/org``):
verifies we can list repositories in that namespace.
Uses registry HTTP APIs directly instead of Trivy to avoid false
failures caused by Trivy DB download issues.
Args:
image: Container image to test
image: Container image or registry URL to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
registry_username: Registry username for basic auth
@@ -868,58 +946,65 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
# Build env with registry credentials
env = dict(os.environ)
if registry_username and registry_password:
env["TRIVY_USERNAME"] = registry_username
env["TRIVY_PASSWORD"] = registry_password
elif registry_token:
env["TRIVY_REGISTRY_TOKEN"] = registry_token
# Test by running trivy with --skip-update to just test image access
process = subprocess.run(
[
"trivy",
"image",
"--skip-db-update",
"--download-db-only=false",
image,
],
capture_output=True,
text=True,
timeout=60,
env=env,
)
if process.returncode == 0:
if ImageProvider._is_registry_url(image):
# Registry enumeration mode — test by listing repositories
adapter = create_registry_adapter(
registry_url=image,
username=registry_username,
password=registry_password,
token=registry_token,
)
adapter.list_repositories()
return Connection(is_connected=True)
else:
error_msg = process.stderr or "Unknown error"
if "401" in error_msg or "unauthorized" in error_msg.lower():
return Connection(
is_connected=False,
error="Authentication failed. Check registry credentials.",
)
elif "not found" in error_msg.lower() or "404" in error_msg:
return Connection(
is_connected=False,
error="Image not found in registry.",
)
else:
return Connection(
is_connected=False,
error=f"Failed to access image: {error_msg[:200]}",
)
except subprocess.TimeoutExpired:
return Connection(
is_connected=False,
error="Connection timed out",
# Image reference mode — verify the specific tag exists
registry_host = ImageProvider._extract_registry(image)
repo_and_tag = image[len(registry_host) + 1 :] if registry_host else image
if ":" in repo_and_tag:
repository, tag = repo_and_tag.rsplit(":", 1)
else:
repository = repo_and_tag
tag = "latest"
is_dockerhub = not registry_host or registry_host in (
"docker.io",
"registry-1.docker.io",
)
except FileNotFoundError:
# Docker Hub official images use "library/" prefix
if is_dockerhub and "/" not in repository:
repository = f"library/{repository}"
if is_dockerhub:
registry_url = f"docker.io/{repository.split('/')[0]}"
else:
registry_url = registry_host
adapter = create_registry_adapter(
registry_url=registry_url,
username=registry_username,
password=registry_password,
token=registry_token,
)
tags = adapter.list_tags(repository)
if tag not in tags:
return Connection(
is_connected=False,
error=f"Tag '{tag}' not found for image '{image}'.",
)
return Connection(is_connected=True)
except ImageRegistryAuthError:
return Connection(
is_connected=False,
error="Trivy binary not found. Please install Trivy.",
error="Authentication failed. Check registry credentials.",
)
except (ImageRegistryNetworkError, ImageRegistryCatalogError) as exc:
return Connection(
is_connected=False,
error=f"Failed to access image: {str(exc)[:200]}",
)
except Exception as error:
if raise_on_exception:

View File

@@ -9,11 +9,13 @@ from urllib.parse import urlparse
import requests
from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
_MAX_RETRIES = 3
_BACKOFF_BASE = 1
_USER_AGENT = f"Prowler/{prowler_version} (registry-adapter)"
class RegistryAdapter(ABC):
@@ -70,8 +72,12 @@ class RegistryAdapter(ABC):
context_label = kwargs.pop("context_label", None) or self.registry_url
kwargs.setdefault("timeout", 30)
kwargs.setdefault("verify", self.verify_ssl)
headers = kwargs.get("headers", {})
headers.setdefault("User-Agent", _USER_AGENT)
kwargs["headers"] = headers
last_exception = None
last_status = None
last_body = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
resp = requests.request(method, url, **kwargs)
@@ -83,6 +89,16 @@ class RegistryAdapter(ABC):
)
time.sleep(wait)
continue
if resp.status_code >= 500:
last_status = resp.status_code
last_body = (resp.text or "")[:500]
wait = _BACKOFF_BASE * (2 ** (attempt - 1))
logger.warning(
f"Server error from {context_label} (HTTP {resp.status_code}), "
f"retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES}): {last_body}"
)
time.sleep(wait)
continue
return resp
except requests.exceptions.ConnectionError as exc:
last_exception = exc
@@ -104,21 +120,27 @@ class RegistryAdapter(ABC):
file=__file__,
message=f"Rate limited by {context_label} after {_MAX_RETRIES} attempts.",
)
if last_status is not None and last_status >= 500:
raise ImageRegistryNetworkError(
file=__file__,
message=f"Server error from {context_label} (HTTP {last_status}) after {_MAX_RETRIES} attempts: {last_body}",
)
raise ImageRegistryNetworkError(
file=__file__,
message=f"Failed to connect to {context_label} after {_MAX_RETRIES} attempts.",
original_exception=last_exception,
)
def _next_page_url(self, resp: requests.Response) -> str | None:
@staticmethod
def _next_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if not match:
return None
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
return f"{parsed.scheme}://{parsed.netloc}{url}"
return url
if match:
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
return f"{parsed.scheme}://{parsed.netloc}{url}"
return url
return None

View File

@@ -115,6 +115,7 @@ class DockerHubAdapter(RegistryAdapter):
return
if not self.username or not self.password:
return
logger.debug(f"Docker Hub login attempt for username: {self.username!r}")
resp = self._request_with_retry(
"POST",
f"{_HUB_API}/v2/users/login",
@@ -122,9 +123,14 @@ class DockerHubAdapter(RegistryAdapter):
context_label="Docker Hub",
)
if resp.status_code != 200:
body_preview = resp.text[:200] if resp.text else "(empty body)"
raise ImageRegistryAuthError(
file=__file__,
message=f"Docker Hub login failed (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
message=(
f"Docker Hub login failed (HTTP {resp.status_code}). "
f"Check REGISTRY_USERNAME and REGISTRY_PASSWORD. "
f"Response: {body_preview}"
),
)
self._hub_jwt = resp.json().get("token")
if not self._hub_jwt:

View File

@@ -1,7 +1,7 @@
import sys
from io import StringIO
from mock import patch
from mock import MagicMock, patch
from prowler.config.config import prowler_version, timestamp
from prowler.lib.logger import logger
@@ -350,6 +350,62 @@ mongodbatlas_html_assessment_summary = """
</div>
</div>"""
image_registry_html_assessment_summary = """
<div class="col-md-2">
<div class="card">
<div class="card-header">
Image Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Registry URL:</b> myregistry.io
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Image Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Image authentication method:</b> Docker login
</li>
</ul>
</div>
</div>"""
image_list_html_assessment_summary = """
<div class="col-md-2">
<div class="card">
<div class="card-header">
Image Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Images:</b> nginx:latest, alpine:3.18
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Image Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Image authentication method:</b> No auth
</li>
</ul>
</div>
</div>"""
def get_aws_html_header(args: list) -> str:
"""
@@ -854,6 +910,36 @@ class TestHTML:
assert summary == mongodbatlas_html_assessment_summary
def test_image_get_assessment_summary_with_registry(self):
"""Test Image HTML assessment summary with registry URL."""
findings = [generate_finding_output()]
output = HTML(findings)
provider = MagicMock()
provider.type = "image"
provider.registry = "myregistry.io"
provider.images = ["nginx:latest", "alpine:3.18"]
provider.auth_method = "Docker login"
summary = output.get_assessment_summary(provider)
assert summary == image_registry_html_assessment_summary
def test_image_get_assessment_summary_with_images(self):
"""Test Image HTML assessment summary with image list."""
findings = [generate_finding_output()]
output = HTML(findings)
provider = MagicMock()
provider.type = "image"
provider.registry = None
provider.images = ["nginx:latest", "alpine:3.18"]
provider.auth_method = "No auth"
summary = output.get_assessment_summary(provider)
assert summary == image_list_html_assessment_summary
def test_process_markdown_bold_text(self):
"""Test that **text** is converted to <strong>text</strong>"""
test_text = "This is **bold text** and this is **also bold**"

View File

@@ -45,8 +45,16 @@ SAMPLE_UNKNOWN_SEVERITY_FINDING = {
"Description": "An issue with unknown severity.",
}
# Sample image SHA for testing (first 12 chars of a sha256 digest)
SAMPLE_IMAGE_SHA = "c1aabb73d233"
SAMPLE_IMAGE_ID = f"sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"
# Full Trivy JSON output structure with a single vulnerability
SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Metadata": {
"ImageID": SAMPLE_IMAGE_ID,
"RepoDigests": [f"alpine@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
},
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
@@ -55,11 +63,15 @@ SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Secrets": [],
"Misconfigurations": [],
}
]
],
}
# Full Trivy JSON output with mixed finding types
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Metadata": {
"ImageID": SAMPLE_IMAGE_ID,
"RepoDigests": [f"myimage@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
},
"Results": [
{
"Target": "myimage:latest (debian 12)",
@@ -68,7 +80,36 @@ SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Secrets": [SAMPLE_SECRET_FINDING],
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
}
]
],
}
# Trivy output with only RepoDigests (no ImageID) for fallback testing
SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT = {
"Metadata": {
"RepoDigests": ["alpine@sha256:e5f6g7h8i9j0abcdef1234567890"],
},
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
],
}
# Trivy output with no Metadata at all
SAMPLE_TRIVY_NO_METADATA_OUTPUT = {
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
],
}
@@ -90,3 +131,13 @@ def get_invalid_trivy_output():
def get_multi_type_trivy_output():
"""Return Trivy output with multiple finding types as string."""
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
def get_repo_digest_only_trivy_output():
"""Return Trivy output with only RepoDigests (no ImageID) as string."""
return json.dumps(SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT)
def get_no_metadata_trivy_output():
"""Return Trivy output with no Metadata as string."""
return json.dumps(SAMPLE_TRIVY_NO_METADATA_OUTPUT)

View File

@@ -15,11 +15,13 @@ from prowler.providers.image.exceptions.exceptions import (
ImageListFileNotFoundError,
ImageListFileReadError,
ImageNoImagesProvidedError,
ImageRegistryAuthError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
SAMPLE_IMAGE_SHA,
SAMPLE_MISCONFIGURATION_FINDING,
SAMPLE_SECRET_FINDING,
SAMPLE_UNKNOWN_SEVERITY_FINDING,
@@ -27,6 +29,8 @@ from tests.providers.image.image_fixtures import (
get_empty_trivy_output,
get_invalid_trivy_output,
get_multi_type_trivy_output,
get_no_metadata_trivy_output,
get_repo_digest_only_trivy_output,
get_sample_trivy_json_output,
)
@@ -42,10 +46,6 @@ def _make_provider(**kwargs):
class TestImageProvider:
@patch.dict(
os.environ,
{"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""},
)
def test_image_provider(self):
"""Test default initialization."""
provider = _make_provider()
@@ -124,22 +124,27 @@ class TestImageProvider:
provider = _make_provider()
report = provider._process_finding(
SAMPLE_VULNERABILITY_FINDING,
"alpine:3.18",
"alpine:3.18 (alpine 3.18.0)",
"alpine",
image_sha="c1aabb73d233",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "CVE-2024-1234"
assert report.check_metadata.Severity == "high"
assert report.check_metadata.ServiceName == "alpine"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.ResourceType == "container-image"
assert report.check_metadata.ResourceGroup == "container"
assert report.package_name == "openssl"
assert report.installed_version == "1.1.1k-r0"
assert report.fixed_version == "1.1.1l-r0"
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
assert report.resource_name == "alpine:3.18"
assert report.image_sha == "c1aabb73d233"
assert report.resource_details == "alpine:3.18 (alpine 3.18.0)"
assert report.region == "container"
assert report.check_metadata.Categories == ["vulnerability"]
assert report.check_metadata.RelatedUrl == ""
def test_process_finding_secret(self):
"""Test processing a secret finding (identified by RuleID)."""
@@ -147,14 +152,15 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_SECRET_FINDING,
"myimage:latest",
"secret",
"myimage:latest (debian 12)",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "aws-access-key-id"
assert report.check_metadata.Severity == "critical"
assert report.check_metadata.ServiceName == "secret"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.Categories == ["secrets"]
def test_process_finding_misconfiguration(self):
"""Test processing a misconfiguration finding (identified by ID)."""
@@ -162,13 +168,14 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_MISCONFIGURATION_FINDING,
"myimage:latest",
"misconfiguration",
"myimage:latest (debian 12)",
)
assert isinstance(report, CheckReportImage)
assert report.check_metadata.CheckID == "DS001"
assert report.check_metadata.Severity == "medium"
assert report.check_metadata.ServiceName == "misconfiguration"
assert report.check_metadata.ServiceName == "container-image"
assert report.check_metadata.Categories == []
def test_process_finding_unknown_severity(self):
"""Test that UNKNOWN severity is mapped to informational."""
@@ -176,7 +183,7 @@ class TestImageProvider:
report = provider._process_finding(
SAMPLE_UNKNOWN_SEVERITY_FINDING,
"myimage:latest",
"alpine",
"myimage:latest (alpine 3.18.0)",
)
assert report.check_metadata.Severity == "informational"
@@ -195,6 +202,9 @@ class TestImageProvider:
assert len(reports) == 1
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
assert reports[0].resource_name == "alpine:3.18"
assert reports[0].check_metadata.ServiceName == "container-image"
@patch("subprocess.run")
def test_run_scan_empty_output(self, mock_subprocess):
@@ -279,20 +289,23 @@ class TestImageProvider:
)
assert "alpine:3.18" in output
@patch("subprocess.run")
def test_test_connection_success(self, mock_subprocess):
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_success(self, mock_factory):
"""Test successful connection returns is_connected=True."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["3.18", "latest"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="alpine:3.18")
assert result.is_connected is True
mock_adapter.list_tags.assert_called_once_with("library/alpine")
@patch("subprocess.run")
def test_test_connection_auth_failure(self, mock_subprocess):
"""Test 401 error returns auth failure."""
mock_subprocess.return_value = MagicMock(
returncode=1, stderr="401 unauthorized"
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_auth_failure(self, mock_factory):
"""Test registry auth error returns auth failure."""
mock_factory.return_value = MagicMock(
list_tags=MagicMock(side_effect=ImageRegistryAuthError(file=__file__))
)
result = ImageProvider.test_connection(image="private/image:latest")
@@ -300,16 +313,36 @@ class TestImageProvider:
assert result.is_connected is False
assert "Authentication failed" in result.error
@patch("subprocess.run")
def test_test_connection_not_found(self, mock_subprocess):
"""Test 404 error returns not found."""
mock_subprocess.return_value = MagicMock(returncode=1, stderr="404 not found")
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_not_found(self, mock_factory):
"""Test tag not found returns not found error."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["v1", "v2"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="nonexistent/image:latest")
assert result.is_connected is False
assert "not found" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_url(self, mock_factory):
"""Test registry URL (namespace) uses list_repositories."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["andoniaf/myapp"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="docker.io/andoniaf")
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="docker.io/andoniaf",
username=None,
password=None,
token=None,
)
mock_adapter.list_repositories.assert_called_once()
def test_build_status_extended(self):
"""Test status message content for different finding types."""
provider = _make_provider()
@@ -394,6 +427,51 @@ class TestImageProvider:
for _ in provider._scan_single_image("private/image:latest"):
pass
@patch("subprocess.run")
def test_sha_extraction_from_image_id(self, mock_subprocess):
"""Test that image_sha is extracted from Trivy Metadata.ImageID."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
@patch("subprocess.run")
def test_sha_extraction_fallback_to_repo_digests(self, mock_subprocess):
"""Test that image_sha falls back to RepoDigests when ImageID is absent."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_repo_digest_only_trivy_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == "e5f6g7h8i9j0"
@patch("subprocess.run")
def test_sha_extraction_no_metadata(self, mock_subprocess):
"""Test that image_sha is empty when no Metadata is present."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_no_metadata_trivy_output(), stderr=""
)
reports = []
for batch in provider._scan_single_image("alpine:3.18"):
reports.extend(batch)
assert len(reports) == 1
assert reports[0].image_sha == ""
@patch("subprocess.run")
def test_run_scan_propagates_scan_error(self, mock_subprocess):
"""Test that run_scan() re-raises ImageScanError instead of swallowing it."""
@@ -409,17 +487,14 @@ class TestImageProvider:
pass
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""}
)
class TestImageProviderRegistryAuth:
def test_no_auth_by_default(self):
"""Test that no auth is set when no credentials are provided."""
provider = _make_provider()
assert not provider.registry_username
assert not provider.registry_password
assert not provider.registry_token
assert provider.registry_username is None
assert provider.registry_password is None
assert provider.registry_token is None
assert provider.auth_method == "No auth"
def test_basic_auth_with_explicit_params(self):
@@ -431,7 +506,7 @@ class TestImageProviderRegistryAuth:
assert provider.registry_username == "myuser"
assert provider.registry_password == "mypass"
assert provider.auth_method == "Basic auth"
assert provider.auth_method == "Docker login"
def test_token_auth_with_explicit_param(self):
"""Test token auth via explicit constructor param."""
@@ -448,7 +523,7 @@ class TestImageProviderRegistryAuth:
registry_token="my-token",
)
assert provider.auth_method == "Basic auth"
assert provider.auth_method == "Docker login"
@patch.dict(
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
@@ -459,7 +534,7 @@ class TestImageProviderRegistryAuth:
assert provider.registry_username == "envuser"
assert provider.registry_password == "envpass"
assert provider.auth_method == "Basic auth"
assert provider.auth_method == "Docker login"
@patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
def test_token_auth_from_env_var(self):
@@ -491,8 +566,8 @@ class TestImageProviderRegistryAuth:
assert "TRIVY_PASSWORD" not in env
assert "TRIVY_REGISTRY_TOKEN" not in env
def test_build_trivy_env_basic_auth_injects_trivy_vars(self):
"""Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for Trivy native auth."""
def test_build_trivy_env_basic_auth_sets_env_vars(self):
"""Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
@@ -510,8 +585,8 @@ class TestImageProviderRegistryAuth:
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
def test_execute_trivy_injects_trivy_env_with_basic_auth(self, mock_subprocess):
"""Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for Trivy native auth."""
def test_execute_trivy_sets_trivy_env_with_basic_auth(self, mock_subprocess):
"""Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for native Trivy auth."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
@@ -527,10 +602,12 @@ class TestImageProviderRegistryAuth:
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("subprocess.run")
def test_test_connection_with_basic_auth(self, mock_subprocess):
"""Test test_connection passes TRIVY_USERNAME/PASSWORD via env for Trivy native auth."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_with_basic_auth(self, mock_factory):
"""Test test_connection passes credentials to the registry adapter."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["v1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
@@ -539,18 +616,19 @@ class TestImageProviderRegistryAuth:
)
assert result.is_connected is True
# Should have 1 subprocess call: trivy only (no docker login/pull/logout)
assert mock_subprocess.call_count == 1
trivy_call = mock_subprocess.call_args
assert trivy_call.args[0][0] == "trivy"
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
mock_factory.assert_called_once_with(
registry_url="private.registry.io",
username="myuser",
password="mypass",
token=None,
)
@patch("subprocess.run")
def test_test_connection_with_token(self, mock_subprocess):
"""Test test_connection passes token via env."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_with_token(self, mock_factory):
"""Test test_connection passes token to the registry adapter."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["v1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
@@ -558,9 +636,12 @@ class TestImageProviderRegistryAuth:
)
assert result.is_connected is True
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
mock_factory.assert_called_once_with(
registry_url="private.registry.io",
username=None,
password=None,
token="my-token",
)
def test_print_credentials_shows_auth_method(self):
"""Test that print_credentials outputs the auth method."""
@@ -573,7 +654,7 @@ class TestImageProviderRegistryAuth:
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
assert "Basic auth" in output
assert "Docker login" in output
class TestExtractRegistry:
@@ -616,120 +697,42 @@ class TestExtractRegistry:
assert ImageProvider._extract_registry("nginx") is None
class TestTrivyAuthIntegration:
@patch("subprocess.run")
def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):
"""Test that run_scan() passes TRIVY_USERNAME/PASSWORD via env when credentials are set."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image:tag"],
registry_username="myuser",
registry_password="mypass",
)
class TestIsRegistryUrl:
def test_registry_url_with_namespace(self):
assert ImageProvider._is_registry_url("docker.io/andoniaf") is True
reports = []
for batch in provider.run_scan():
reports.extend(batch)
def test_registry_url_ghcr(self):
assert ImageProvider._is_registry_url("ghcr.io/org") is True
calls = mock_subprocess.call_args_list
# Only trivy calls, no docker login/pull
assert all(call.args[0][0] == "trivy" for call in calls)
env = calls[0].kwargs.get("env") or calls[0][1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
def test_image_ref_with_tag(self):
assert ImageProvider._is_registry_url("ghcr.io/user/image:tag") is False
@patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
@patch("subprocess.run")
def test_run_scan_no_trivy_auth_without_credentials(self, mock_subprocess):
"""Test that run_scan() does NOT set TRIVY_USERNAME/PASSWORD when no credentials."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
def test_image_ref_with_repo(self):
assert ImageProvider._is_registry_url("ghcr.io/user/image") is False
def test_dockerhub_short_image(self):
assert ImageProvider._is_registry_url("alpine:3.18") is False
def test_dockerhub_with_namespace(self):
assert ImageProvider._is_registry_url("andoniaf/test:tag") is False
def test_bare_image_name(self):
assert ImageProvider._is_registry_url("nginx") is False
def test_localhost_namespace(self):
assert ImageProvider._is_registry_url("localhost:5000/myns") is True
def test_localhost_image_with_tag(self):
assert ImageProvider._is_registry_url("localhost:5000/myns/image:v1") is False
class TestCleanup:
def test_cleanup_idempotent(self):
"""Test cleanup is safe to call multiple times."""
provider = _make_provider()
for batch in provider.run_scan():
pass
calls = mock_subprocess.call_args_list
assert all(call.args[0][0] == "trivy" for call in calls)
@patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
@patch("subprocess.run")
def test_run_scan_token_auth_via_env(self, mock_subprocess):
"""Test that run_scan() passes TRIVY_REGISTRY_TOKEN when only token is provided."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(registry_token="my-token")
for batch in provider.run_scan():
pass
calls = mock_subprocess.call_args_list
assert all(call.args[0][0] == "trivy" for call in calls)
env = calls[0].kwargs.get("env") or calls[0][1].get("env")
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
def test_run_with_credentials_only_calls_trivy(self, mock_subprocess):
"""Test that run() only calls trivy (no docker login/pull/logout)."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image:tag"],
registry_username="myuser",
registry_password="mypass",
)
provider.run()
calls = mock_subprocess.call_args_list
assert all(call.args[0][0] == "trivy" for call in calls)
@patch("subprocess.run")
def test_run_scan_multiple_images_all_get_trivy_env(self, mock_subprocess):
"""Test that all trivy calls get TRIVY_USERNAME/PASSWORD when scanning multiple images."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
registry_username="myuser",
registry_password="mypass",
)
for batch in provider.run_scan():
pass
calls = mock_subprocess.call_args_list
trivy_calls = [c for c in calls if c.args[0][0] == "trivy"]
assert len(trivy_calls) == 2
for call in trivy_calls:
env = call.kwargs.get("env") or call[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("subprocess.run")
def test_test_connection_docker_hub_uses_trivy_auth(self, mock_subprocess):
"""Test test_connection passes TRIVY creds for Docker Hub images."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="andoniaf/test-private:tag",
registry_username="myuser",
registry_password="mypass",
)
assert result.is_connected is True
assert mock_subprocess.call_count == 1
trivy_call = mock_subprocess.call_args
assert trivy_call.args[0][0] == "trivy"
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
provider.cleanup()
provider.cleanup()
class TestImageProviderInputValidation:
@@ -921,3 +924,67 @@ class TestImageProviderNameValidation:
with pytest.raises(ImageListFileReadError):
_make_provider(images=None, image_list_file=file_path)
class TestScanPerImage:
@patch("subprocess.run")
def test_yields_per_image(self, mock_subprocess):
"""Test that scan_per_image yields (name, findings) per image."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
results = list(provider.scan_per_image())
assert len(results) == 2
for name, findings in results:
assert isinstance(name, str)
assert isinstance(findings, list)
assert all(isinstance(f, CheckReportImage) for f in findings)
@patch("subprocess.run")
def test_reraises_scan_error(self, mock_subprocess):
"""Test that ImageScanError propagates from scan_per_image."""
mock_subprocess.return_value = MagicMock(
returncode=1, stdout="", stderr="scan failed"
)
provider = _make_provider(images=["alpine:3.18"])
with pytest.raises(ImageScanError):
list(provider.scan_per_image())
@patch("subprocess.run")
def test_skips_generic_error(self, mock_subprocess):
"""Test that a generic RuntimeError in _scan_single_image yields empty findings and continues."""
def side_effect(cmd, **kwargs):
if "bad:image" in cmd:
raise RuntimeError("unexpected error")
return MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
mock_subprocess.side_effect = side_effect
provider = _make_provider(images=["bad:image", "alpine:3.18"])
results = list(provider.scan_per_image())
assert len(results) == 2
assert results[0][0] == "bad:image"
assert results[0][1] == []
assert results[1][0] == "alpine:3.18"
assert len(results[1][1]) > 0
@patch("subprocess.run")
def test_calls_cleanup(self, mock_subprocess):
"""Test that cleanup is called even after scan_per_image completes."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(images=["alpine:3.18"])
with mock.patch.object(provider, "cleanup") as mock_cleanup:
list(provider.scan_per_image())
mock_cleanup.assert_called_once()

View File

@@ -99,7 +99,7 @@ class TestDockerHubListTags:
class TestDockerHubLogin:
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_failure(self, mock_request):
resp = MagicMock(status_code=401)
resp = MagicMock(status_code=401, text="invalid credentials")
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="bad", password="creds")
with pytest.raises(ImageRegistryAuthError, match="login failed"):
@@ -110,6 +110,29 @@ class TestDockerHubLogin:
adapter._hub_login() # Should not raise
assert adapter._hub_jwt is None
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_401_includes_response_body(self, mock_request):
resp = MagicMock(
status_code=401, text='{"detail":"Incorrect authentication credentials"}'
)
mock_request.return_value = resp
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(
ImageRegistryAuthError, match="Incorrect authentication credentials"
):
adapter._hub_login()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_login_500_retried_then_raises_network_error(
self, mock_request, mock_sleep
):
mock_request.return_value = MagicMock(status_code=500)
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryNetworkError, match="Server error"):
adapter._hub_login()
assert mock_request.call_count == 3
class TestDockerHubRetry:
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@@ -133,6 +156,63 @@ class TestDockerHubRetry:
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_on_500(self, mock_request, mock_sleep):
resp_500 = MagicMock(status_code=500)
resp_200 = MagicMock(status_code=200)
mock_request.side_effect = [resp_500, resp_200]
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry("GET", "https://hub.docker.com")
assert result.status_code == 200
assert mock_request.call_count == 2
mock_sleep.assert_called_once()
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_exhausted_on_500_raises_network_error(
self, mock_request, mock_sleep
):
mock_request.return_value = MagicMock(status_code=500)
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(
ImageRegistryNetworkError, match="Server error.*HTTP 500.*3 attempts"
):
adapter._request_with_retry("GET", "https://hub.docker.com")
assert mock_request.call_count == 3
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_4xx_not_retried(self, mock_request, mock_sleep):
mock_request.return_value = MagicMock(status_code=403)
adapter = DockerHubAdapter("docker.io/myorg")
result = adapter._request_with_retry("GET", "https://hub.docker.com")
assert result.status_code == 403
assert mock_request.call_count == 1
mock_sleep.assert_not_called()
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_request_sends_user_agent(self, mock_request):
mock_request.return_value = MagicMock(status_code=200)
adapter = DockerHubAdapter("docker.io/myorg")
adapter._request_with_retry("GET", "https://hub.docker.com")
_, kwargs = mock_request.call_args
from prowler.config.config import prowler_version
assert (
kwargs["headers"]["User-Agent"]
== f"Prowler/{prowler_version} (registry-adapter)"
)
@patch("prowler.providers.image.lib.registry.base.time.sleep")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_retry_500_includes_response_body(self, mock_request, mock_sleep):
resp_500 = MagicMock(status_code=500, text="<html>Cloudflare error</html>")
mock_request.return_value = resp_500
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryNetworkError, match="Cloudflare error"):
adapter._request_with_retry("GET", "https://hub.docker.com")
class TestDockerHubEmptyTokens:
@patch("prowler.providers.image.lib.registry.base.requests.request")

View File

@@ -288,31 +288,33 @@ class TestOciAdapterRetry:
class TestOciAdapterNextPageUrl:
def test_no_link_header(self):
adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(headers={})
assert adapter._next_page_url(resp) is None
assert OciRegistryAdapter._next_page_url(resp) is None
def test_link_header_with_next(self):
adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'}
)
assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?n=200&last=b"
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_relative_url(self):
resp = MagicMock(
headers={"Link": '</v2/_catalog?n=200&last=b>; rel="next"'},
url="https://reg.io/v2/_catalog?n=200",
)
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_no_next(self):
adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200>; rel="prev"'}
)
assert adapter._next_page_url(resp) is None
def test_link_header_relative_url(self):
adapter = OciRegistryAdapter("reg.io")
resp = MagicMock(
url="https://reg.io/v2/_catalog?n=200",
headers={"Link": '</v2/_catalog?last=b&n=200>; rel="next"'},
)
assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?last=b&n=200"
assert OciRegistryAdapter._next_page_url(resp) is None
class TestOciAdapterSSRF:

View File

@@ -152,16 +152,15 @@ class TestEmptyRegistry:
class TestRegistryList:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_list_prints_and_exits(self, mock_factory, capsys):
def test_registry_list_prints_and_returns(self, mock_factory, capsys):
adapter = MagicMock()
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
mock_factory.return_value = adapter
with pytest.raises(SystemExit) as exc_info:
_build_provider(registry_list_images=True)
provider = _build_provider(registry_list_images=True)
assert exc_info.value.code == 0
assert provider._listing_only is True
captured = capsys.readouterr()
assert "app/frontend" in captured.out
assert "app/backend" in captured.out
@@ -177,10 +176,9 @@ class TestRegistryList:
adapter.list_tags.return_value = ["latest"]
mock_factory.return_value = adapter
with pytest.raises(SystemExit) as exc_info:
_build_provider(registry_list_images=True, image_filter="^prod/")
provider = _build_provider(registry_list_images=True, image_filter="^prod/")
assert exc_info.value.code == 0
assert provider._listing_only is True
captured = capsys.readouterr()
assert "prod/app" in captured.out
assert "dev/app" not in captured.out
@@ -193,10 +191,9 @@ class TestRegistryList:
adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
mock_factory.return_value = adapter
with pytest.raises(SystemExit) as exc_info:
_build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
provider = _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
assert exc_info.value.code == 0
assert provider._listing_only is True
captured = capsys.readouterr()
assert "v1.0" in captured.out
assert "dev-abc" not in captured.out
@@ -210,10 +207,9 @@ class TestRegistryList:
mock_factory.return_value = adapter
# max_images=1 would normally raise, but --registry-list skips it
with pytest.raises(SystemExit) as exc_info:
_build_provider(registry_list_images=True, max_images=1)
provider = _build_provider(registry_list_images=True, max_images=1)
assert exc_info.value.code == 0
assert provider._listing_only is True
captured = capsys.readouterr()
assert "6 images" in captured.out

View File

@@ -6,6 +6,7 @@ All notable changes to the **Prowler UI** are documented in this file.
### 🚀 Added
- Image (Container Registry) provider support in UI: badge icon, credentials form, and provider-type filtering
- OpenStack provider support in the UI [(#10046)](https://github.com/prowler-cloud/prowler/pull/10046)
- PDF report available for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088)
- CSV and PDF download buttons in compliance views [(#10093)](https://github.com/prowler-cloud/prowler/pull/10093)

View File

@@ -10,6 +10,7 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
@@ -34,6 +35,7 @@ const PROVIDER_ICON: Record<ProviderType, ReactNode> = {
m365: <M365ProviderBadge width={18} height={18} />,
github: <GitHubProviderBadge width={18} height={18} />,
iac: <IacProviderBadge width={18} height={18} />,
image: <ImageProviderBadge width={18} height={18} />,
oraclecloud: <OracleCloudProviderBadge width={18} height={18} />,
mongodbatlas: <MongoDBAtlasProviderBadge width={18} height={18} />,
alibabacloud: <AlibabaCloudProviderBadge width={18} height={18} />,

View File

@@ -1,7 +1,7 @@
"use client";
import { useSearchParams } from "next/navigation";
import { lazy, Suspense } from "react";
import { type ComponentType, lazy, Suspense } from "react";
import {
MultiSelect,
@@ -48,6 +48,11 @@ const IacProviderBadge = lazy(() =>
default: m.IacProviderBadge,
})),
);
const ImageProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.ImageProviderBadge,
})),
);
const OracleCloudProviderBadge = lazy(() =>
import("@/components/icons/providers-badge").then((m) => ({
default: m.OracleCloudProviderBadge,
@@ -77,7 +82,7 @@ const IconPlaceholder = ({ width, height }: IconProps) => (
const PROVIDER_DATA: Record<
ProviderType,
{ label: string; icon: React.ComponentType<IconProps> }
{ label: string; icon: ComponentType<IconProps> }
> = {
aws: {
label: "Amazon Web Services",
@@ -107,6 +112,10 @@ const PROVIDER_DATA: Record<
label: "Infrastructure as Code",
icon: IacProviderBadge,
},
image: {
label: "Container Registry",
icon: ImageProviderBadge,
},
oraclecloud: {
label: "Oracle Cloud Infrastructure",
icon: OracleCloudProviderBadge,

View File

@@ -5,6 +5,7 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
@@ -84,6 +85,15 @@ export const CustomProviderInputIac = () => {
);
};
export const CustomProviderInputImage = () => {
return (
<div className="flex items-center gap-x-2">
<ImageProviderBadge width={25} height={25} />
<p className="text-sm">Container Registry</p>
</div>
);
};
export const CustomProviderInputOracleCloud = () => {
return (
<div className="flex items-center gap-x-2">

View File

@@ -5,6 +5,7 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
@@ -21,6 +22,7 @@ export const PROVIDER_ICONS = {
m365: M365ProviderBadge,
github: GitHubProviderBadge,
iac: IacProviderBadge,
image: ImageProviderBadge,
oraclecloud: OracleCloudProviderBadge,
mongodbatlas: MongoDBAtlasProviderBadge,
alibabacloud: AlibabaCloudProviderBadge,

View File

@@ -0,0 +1,36 @@
import { FC } from "react";
import { IconSvgProps } from "@/types";
export const ImageProviderBadge: FC<IconSvgProps> = ({
size,
width,
height,
...props
}) => (
<svg
xmlns="http://www.w3.org/2000/svg"
aria-hidden="true"
fill="none"
focusable="false"
height={size || height}
role="presentation"
viewBox="0 0 256 256"
width={size || width}
{...props}
>
<rect width="256" height="256" fill="#1c1917" rx="60" />
<g
transform="translate(20, 20) scale(9)"
fill="none"
stroke="#fff"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M12.89 1.45L21 5.75V18.25L12.89 22.55C12.33 22.84 11.67 22.84 11.11 22.55L3 18.25V5.75L11.11 1.45C11.67 1.16 12.33 1.16 12.89 1.45Z" />
<path d="M3.5 6L12 10.5L20.5 6" />
<path d="M12 22.5V10.5" />
</g>
</svg>
);

View File

@@ -8,6 +8,7 @@ import { AzureProviderBadge } from "./azure-provider-badge";
import { GCPProviderBadge } from "./gcp-provider-badge";
import { GitHubProviderBadge } from "./github-provider-badge";
import { IacProviderBadge } from "./iac-provider-badge";
import { ImageProviderBadge } from "./image-provider-badge";
import { KS8ProviderBadge } from "./ks8-provider-badge";
import { M365ProviderBadge } from "./m365-provider-badge";
import { MongoDBAtlasProviderBadge } from "./mongodbatlas-provider-badge";
@@ -21,6 +22,7 @@ export {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
@@ -37,6 +39,7 @@ export const PROVIDER_ICONS: Record<string, FC<IconSvgProps>> = {
"Microsoft 365": M365ProviderBadge,
GitHub: GitHubProviderBadge,
"Infrastructure as Code": IacProviderBadge,
"Container Registry": ImageProviderBadge,
"Oracle Cloud Infrastructure": OracleCloudProviderBadge,
"MongoDB Atlas": MongoDBAtlasProviderBadge,
"Alibaba Cloud": AlibabaCloudProviderBadge,

View File

@@ -15,6 +15,7 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
@@ -64,6 +65,11 @@ const PROVIDERS = [
label: "Infrastructure as Code",
badge: IacProviderBadge,
},
{
value: "image",
label: "Container Registry",
badge: ImageProviderBadge,
},
{
value: "oraclecloud",
label: "Oracle Cloud Infrastructure",

View File

@@ -20,6 +20,7 @@ import {
GCPDefaultCredentials,
GCPServiceAccountKey,
IacCredentials,
ImageCredentials,
KubernetesCredentials,
M365CertificateCredentials,
M365ClientSecretCredentials,
@@ -45,6 +46,7 @@ import {
import { AzureCredentialsForm } from "./via-credentials/azure-credentials-form";
import { GitHubCredentialsForm } from "./via-credentials/github-credentials-form";
import { IacCredentialsForm } from "./via-credentials/iac-credentials-form";
import { ImageCredentialsForm } from "./via-credentials/image-credentials-form";
import { KubernetesCredentialsForm } from "./via-credentials/k8s-credentials-form";
import { MongoDBAtlasCredentialsForm } from "./via-credentials/mongodbatlas-credentials-form";
import { OpenStackCredentialsForm } from "./via-credentials/openstack-credentials-form";
@@ -180,6 +182,11 @@ export const BaseCredentialsForm = ({
control={form.control as unknown as Control<IacCredentials>}
/>
)}
{providerType === "image" && (
<ImageCredentialsForm
control={form.control as unknown as Control<ImageCredentials>}
/>
)}
{providerType === "oraclecloud" && (
<OracleCloudCredentialsForm
control={form.control as unknown as Control<OCICredentials>}

View File

@@ -57,6 +57,11 @@ const getProviderFieldDetails = (providerType?: ProviderType) => {
label: "Repository URL",
placeholder: "e.g. https://github.com/user/repo",
};
case "image":
return {
label: "Registry URL",
placeholder: "e.g. https://registry.example.com",
};
case "oraclecloud":
return {
label: "Tenancy OCID",

View File

@@ -0,0 +1,83 @@
import { Control } from "react-hook-form";
import { CustomInput } from "@/components/ui/custom";
import { ImageCredentials } from "@/types";
export const ImageCredentialsForm = ({
control,
}: {
control: Control<ImageCredentials>;
}) => {
return (
<>
<div className="flex flex-col">
<div className="text-md text-default-foreground leading-9 font-bold">
Connect via Registry Credentials
</div>
<div className="text-default-500 text-sm">
Provide registry credentials to authenticate with your container
registry (all fields are optional).
</div>
</div>
<CustomInput
control={control}
name="registry_username"
label="Registry Username (Optional)"
labelPlacement="inside"
placeholder="Username for registry authentication"
variant="bordered"
type="text"
isRequired={false}
/>
<CustomInput
control={control}
name="registry_password"
label="Registry Password (Optional)"
labelPlacement="inside"
placeholder="Password for registry authentication"
variant="bordered"
type="password"
isRequired={false}
/>
<CustomInput
control={control}
name="registry_token"
label="Registry Token (Optional)"
labelPlacement="inside"
placeholder="Token for registry authentication"
variant="bordered"
type="password"
isRequired={false}
/>
<div className="flex flex-col pt-2">
<div className="text-md text-default-foreground leading-9 font-bold">
Scan Scope
</div>
<div className="text-default-500 text-sm">
Limit which repositories and tags are scanned using regex patterns.
</div>
</div>
<CustomInput
control={control}
name="image_filter"
label="Image Filter (Optional)"
labelPlacement="inside"
placeholder="e.g. ^prod/.*"
variant="bordered"
type="text"
isRequired={false}
/>
<CustomInput
control={control}
name="tag_filter"
label="Tag Filter (Optional)"
labelPlacement="inside"
placeholder="e.g. ^(latest|v\d+\.\d+\.\d+)$"
variant="bordered"
type="text"
isRequired={false}
/>
</>
);
};

View File

@@ -1,6 +1,7 @@
export * from "./azure-credentials-form";
export * from "./github-credentials-form";
export * from "./iac-credentials-form";
export * from "./image-credentials-form";
export * from "./k8s-credentials-form";
export * from "./mongodbatlas-credentials-form";
export * from "./openstack-credentials-form";

View File

@@ -5,6 +5,7 @@ import {
GCPProviderBadge,
GitHubProviderBadge,
IacProviderBadge,
ImageProviderBadge,
KS8ProviderBadge,
M365ProviderBadge,
MongoDBAtlasProviderBadge,
@@ -29,6 +30,8 @@ export const getProviderLogo = (provider: ProviderType) => {
return <GitHubProviderBadge width={35} height={35} />;
case "iac":
return <IacProviderBadge width={35} height={35} />;
case "image":
return <ImageProviderBadge width={35} height={35} />;
case "oraclecloud":
return <OracleCloudProviderBadge width={35} height={35} />;
case "mongodbatlas":
@@ -58,6 +61,8 @@ export const getProviderName = (provider: ProviderType): string => {
return "GitHub";
case "iac":
return "Infrastructure as Code";
case "image":
return "Container Registry";
case "oraclecloud":
return "Oracle Cloud Infrastructure";
case "mongodbatlas":

View File

@@ -43,6 +43,11 @@ export const getProviderHelpText = (provider: string) => {
text: "Need help scanning your Infrastructure as Code repository?",
link: "https://goto.prowler.com/provider-iac",
};
case "image":
return {
text: "Need help scanning your container registry?",
link: "https://goto.prowler.com/provider-image",
};
case "oraclecloud":
return {
text: "Need help connecting your Oracle Cloud account?",

View File

@@ -278,6 +278,32 @@ export const buildIacSecret = (formData: FormData) => {
return filterEmptyValues(secret);
};
export const buildImageSecret = (formData: FormData) => {
const secret = {
[ProviderCredentialFields.REGISTRY_USERNAME]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_USERNAME,
),
[ProviderCredentialFields.REGISTRY_PASSWORD]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_PASSWORD,
),
[ProviderCredentialFields.REGISTRY_TOKEN]: getFormValue(
formData,
ProviderCredentialFields.REGISTRY_TOKEN,
),
[ProviderCredentialFields.IMAGE_FILTER]: getFormValue(
formData,
ProviderCredentialFields.IMAGE_FILTER,
),
[ProviderCredentialFields.TAG_FILTER]: getFormValue(
formData,
ProviderCredentialFields.TAG_FILTER,
),
};
return filterEmptyValues(secret);
};
/**
* Utility function to safely encode a string to base64
* Handles UTF-8 characters properly without using deprecated APIs
@@ -371,6 +397,10 @@ export const buildSecretConfig = (
secretType: "static",
secret: buildIacSecret(formData),
}),
image: () => ({
secretType: "static",
secret: buildImageSecret(formData),
}),
oraclecloud: () => ({
secretType: "static",
secret: buildOracleCloudSecret(formData, providerUid),

View File

@@ -53,6 +53,13 @@ export const ProviderCredentialFields = {
REPOSITORY_URL: "repository_url",
ACCESS_TOKEN: "access_token",
// Image (Container Registry) fields
REGISTRY_USERNAME: "registry_username",
REGISTRY_PASSWORD: "registry_password",
REGISTRY_TOKEN: "registry_token",
IMAGE_FILTER: "image_filter",
TAG_FILTER: "tag_filter",
// OCI fields
OCI_USER: "user",
OCI_FINGERPRINT: "fingerprint",
@@ -101,6 +108,11 @@ export const ErrorPointers = {
GITHUB_APP_KEY: "/data/attributes/secret/github_app_key_content",
REPOSITORY_URL: "/data/attributes/secret/repository_url",
ACCESS_TOKEN: "/data/attributes/secret/access_token",
REGISTRY_USERNAME: "/data/attributes/secret/registry_username",
REGISTRY_PASSWORD: "/data/attributes/secret/registry_password",
REGISTRY_TOKEN: "/data/attributes/secret/registry_token",
IMAGE_FILTER: "/data/attributes/secret/image_filter",
TAG_FILTER: "/data/attributes/secret/tag_filter",
CERTIFICATE_CONTENT: "/data/attributes/secret/certificate_content",
OCI_USER: "/data/attributes/secret/user",
OCI_FINGERPRINT: "/data/attributes/secret/fingerprint",

View File

@@ -304,6 +304,15 @@ export type IacCredentials = {
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type ImageCredentials = {
[ProviderCredentialFields.REGISTRY_USERNAME]?: string;
[ProviderCredentialFields.REGISTRY_PASSWORD]?: string;
[ProviderCredentialFields.REGISTRY_TOKEN]?: string;
[ProviderCredentialFields.IMAGE_FILTER]?: string;
[ProviderCredentialFields.TAG_FILTER]?: string;
[ProviderCredentialFields.PROVIDER_ID]: string;
};
export type OCICredentials = {
[ProviderCredentialFields.OCI_USER]: string;
[ProviderCredentialFields.OCI_FINGERPRINT]: string;
@@ -348,6 +357,7 @@ export type CredentialsFormSchema =
| GCPServiceAccountKey
| KubernetesCredentials
| IacCredentials
| ImageCredentials
| M365Credentials
| OCICredentials
| MongoDBAtlasCredentials

View File

@@ -115,6 +115,11 @@ export const addProviderFormSchema = z
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("image"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
providerUid: z.string(),
}),
z.object({
providerType: z.literal("oraclecloud"),
[ProviderCredentialFields.PROVIDER_ALIAS]: z.string(),
@@ -264,16 +269,37 @@ export const addCredentialsFormSchema = (
.string()
.min(1, "Access Key Secret is required"),
}
: providerType === "openstack"
: providerType === "image"
? {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]:
z
.string()
.min(1, "Clouds YAML content is required"),
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]:
z.string().min(1, "Cloud name is required"),
[ProviderCredentialFields.REGISTRY_USERNAME]: z
.string()
.optional(),
[ProviderCredentialFields.REGISTRY_PASSWORD]: z
.string()
.optional(),
[ProviderCredentialFields.REGISTRY_TOKEN]: z
.string()
.optional(),
[ProviderCredentialFields.IMAGE_FILTER]: z
.string()
.optional(),
[ProviderCredentialFields.TAG_FILTER]: z
.string()
.optional(),
}
: {}),
: providerType === "openstack"
? {
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CONTENT]:
z
.string()
.min(
1,
"Clouds YAML content is required",
),
[ProviderCredentialFields.OPENSTACK_CLOUDS_YAML_CLOUD]:
z.string().min(1, "Cloud name is required"),
}
: {}),
})
.superRefine((data: Record<string, string | undefined>, ctx) => {
if (providerType === "m365") {

View File

@@ -7,6 +7,7 @@ export const PROVIDER_TYPES = [
"mongodbatlas",
"github",
"iac",
"image",
"oraclecloud",
"alibabacloud",
"openstack",
@@ -23,6 +24,7 @@ export const PROVIDER_DISPLAY_NAMES: Record<ProviderType, string> = {
mongodbatlas: "MongoDB Atlas",
github: "GitHub",
iac: "Infrastructure as Code",
image: "Container Registry",
oraclecloud: "Oracle Cloud Infrastructure",
alibabacloud: "Alibaba Cloud",
openstack: "OpenStack",