feat(image): add image provider to UI (#10167)

Co-authored-by: alejandrobailo <alejandrobailo94@gmail.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Alejandro Bailo <59607668+alejandrobailo@users.noreply.github.com>
This commit is contained in:
Andoni Alonso
2026-03-17 10:53:37 +01:00
committed by GitHub
parent 887a20f06e
commit 451071d694
25 changed files with 551 additions and 61 deletions

View File

@@ -53,7 +53,7 @@ class TestImageProvider:
assert provider._type == "image"
assert provider.type == "image"
assert provider.images == ["alpine:3.18"]
assert provider.scanners == ["vuln", "secret"]
assert provider.scanners == ["vuln", "secret", "misconfig"]
assert provider.image_config_scanners == []
assert provider.trivy_severity == []
assert provider.ignore_unfixed is False
@@ -698,8 +698,118 @@ class TestExtractRegistry:
class TestIsRegistryUrl:
def test_registry_url_with_namespace(self):
assert ImageProvider._is_registry_url("docker.io/andoniaf") is True
def test_bare_ecr_hostname(self):
assert ImageProvider._is_registry_url(
"714274078102.dkr.ecr.eu-west-1.amazonaws.com"
)
def test_bare_hostname_with_port(self):
assert ImageProvider._is_registry_url("myregistry.com:5000")
def test_bare_ghcr(self):
assert ImageProvider._is_registry_url("ghcr.io")
def test_registry_with_namespace_only(self):
"""Registry URL with a single path segment (no tag) is a registry URL."""
assert ImageProvider._is_registry_url("ghcr.io/myorg")
def test_image_reference_not_registry(self):
"""Full image reference with repo and tag is not a registry URL."""
assert not ImageProvider._is_registry_url("ghcr.io/myorg/repo:tag")
def test_simple_image_name(self):
assert not ImageProvider._is_registry_url("alpine:3.18")
def test_bare_image_no_tag(self):
assert not ImageProvider._is_registry_url("nginx")
def test_dockerhub_namespace(self):
assert not ImageProvider._is_registry_url("library/alpine")
class TestTestRegistryConnection:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_connection_success(self, mock_factory):
"""Test that a bare hostname triggers registry catalog test."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["repo1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="714274078102.dkr.ecr.eu-west-1.amazonaws.com",
registry_username="user",
registry_password="pass",
)
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="714274078102.dkr.ecr.eu-west-1.amazonaws.com",
username="user",
password="pass",
token=None,
)
mock_adapter.list_repositories.assert_called_once()
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_connection_auth_failure(self, mock_factory):
"""Test that 401 from registry adapter returns auth failure."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.side_effect = Exception("401 unauthorized")
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="714274078102.dkr.ecr.eu-west-1.amazonaws.com",
)
assert result.is_connected is False
assert "Authentication failed" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_registry_connection_generic_error(self, mock_factory):
"""Test that a generic error from registry adapter returns error message."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.side_effect = Exception("connection refused")
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(
image="myregistry.example.com",
)
assert result.is_connected is False
assert "Failed to connect to registry" in result.error
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_image_reference_uses_registry_adapter(self, mock_factory):
"""Test that a full image reference uses registry adapter to verify tag."""
mock_adapter = MagicMock()
mock_adapter.list_tags.return_value = ["3.18", "latest"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="alpine:3.18")
assert result.is_connected is True
mock_adapter.list_tags.assert_called_once()
class TestTrivyAuthIntegration:
@patch("subprocess.run")
def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):
"""Test that run_scan() passes TRIVY_USERNAME/PASSWORD via env when credentials are set."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image:tag"],
registry_username="myuser",
registry_password="mypass",
)
list(provider.run_scan())
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
def test_registry_url_ghcr(self):
assert ImageProvider._is_registry_url("ghcr.io/org") is True
@@ -734,6 +844,16 @@ class TestCleanup:
provider.cleanup()
provider.cleanup()
def test_cleanup_removes_trivy_cache_dir(self):
"""Test that cleanup removes the temporary Trivy cache directory."""
provider = _make_provider()
cache_dir = provider._trivy_cache_dir
assert os.path.isdir(cache_dir)
provider.cleanup()
assert not os.path.isdir(cache_dir)
class TestImageProviderInputValidation:
def test_invalid_timeout_format_raises_error(self):
@@ -804,6 +924,22 @@ class TestImageProviderInputValidation:
with pytest.raises(ImageInvalidConfigScannerError):
_make_provider(image_config_scanners=["misconfig", "vuln"])
@patch("subprocess.run")
def test_trivy_command_includes_cache_dir(self, mock_subprocess):
"""Test that Trivy command includes --cache-dir for cache isolation."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_empty_trivy_output(), stderr=""
)
for _ in provider._scan_single_image("alpine:3.18"):
pass
call_args = mock_subprocess.call_args[0][0]
assert "--cache-dir" in call_args
idx = call_args.index("--cache-dir")
assert call_args[idx + 1] == provider._trivy_cache_dir
@patch("subprocess.run")
def test_trivy_command_includes_image_config_scanners(self, mock_subprocess):
"""Test that Trivy command includes --image-config-scanners when set."""