mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-22 03:08:23 +00:00
feat(api): add Image provider support for container image scanning (#10128)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import sys
|
||||
from io import StringIO
|
||||
|
||||
from mock import patch
|
||||
from mock import MagicMock, patch
|
||||
|
||||
from prowler.config.config import prowler_version, timestamp
|
||||
from prowler.lib.logger import logger
|
||||
@@ -350,6 +350,62 @@ mongodbatlas_html_assessment_summary = """
|
||||
</div>
|
||||
</div>"""
|
||||
|
||||
image_registry_html_assessment_summary = """
|
||||
<div class="col-md-2">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
Image Assessment Summary
|
||||
</div>
|
||||
<ul class="list-group
|
||||
list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>Registry URL:</b> myregistry.io
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-4">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
Image Credentials
|
||||
</div>
|
||||
<ul class="list-group
|
||||
list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>Image authentication method:</b> Docker login
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>"""
|
||||
|
||||
image_list_html_assessment_summary = """
|
||||
<div class="col-md-2">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
Image Assessment Summary
|
||||
</div>
|
||||
<ul class="list-group
|
||||
list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>Images:</b> nginx:latest, alpine:3.18
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-4">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
Image Credentials
|
||||
</div>
|
||||
<ul class="list-group
|
||||
list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>Image authentication method:</b> No auth
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>"""
|
||||
|
||||
|
||||
def get_aws_html_header(args: list) -> str:
|
||||
"""
|
||||
@@ -854,6 +910,36 @@ class TestHTML:
|
||||
|
||||
assert summary == mongodbatlas_html_assessment_summary
|
||||
|
||||
def test_image_get_assessment_summary_with_registry(self):
|
||||
"""Test Image HTML assessment summary with registry URL."""
|
||||
findings = [generate_finding_output()]
|
||||
output = HTML(findings)
|
||||
|
||||
provider = MagicMock()
|
||||
provider.type = "image"
|
||||
provider.registry = "myregistry.io"
|
||||
provider.images = ["nginx:latest", "alpine:3.18"]
|
||||
provider.auth_method = "Docker login"
|
||||
|
||||
summary = output.get_assessment_summary(provider)
|
||||
|
||||
assert summary == image_registry_html_assessment_summary
|
||||
|
||||
def test_image_get_assessment_summary_with_images(self):
|
||||
"""Test Image HTML assessment summary with image list."""
|
||||
findings = [generate_finding_output()]
|
||||
output = HTML(findings)
|
||||
|
||||
provider = MagicMock()
|
||||
provider.type = "image"
|
||||
provider.registry = None
|
||||
provider.images = ["nginx:latest", "alpine:3.18"]
|
||||
provider.auth_method = "No auth"
|
||||
|
||||
summary = output.get_assessment_summary(provider)
|
||||
|
||||
assert summary == image_list_html_assessment_summary
|
||||
|
||||
def test_process_markdown_bold_text(self):
|
||||
"""Test that **text** is converted to <strong>text</strong>"""
|
||||
test_text = "This is **bold text** and this is **also bold**"
|
||||
|
||||
@@ -45,8 +45,16 @@ SAMPLE_UNKNOWN_SEVERITY_FINDING = {
|
||||
"Description": "An issue with unknown severity.",
|
||||
}
|
||||
|
||||
# Sample image SHA for testing (first 12 chars of a sha256 digest)
|
||||
SAMPLE_IMAGE_SHA = "c1aabb73d233"
|
||||
SAMPLE_IMAGE_ID = f"sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"
|
||||
|
||||
# Full Trivy JSON output structure with a single vulnerability
|
||||
SAMPLE_TRIVY_IMAGE_OUTPUT = {
|
||||
"Metadata": {
|
||||
"ImageID": SAMPLE_IMAGE_ID,
|
||||
"RepoDigests": [f"alpine@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
|
||||
},
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
@@ -55,11 +63,15 @@ SAMPLE_TRIVY_IMAGE_OUTPUT = {
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
# Full Trivy JSON output with mixed finding types
|
||||
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
|
||||
"Metadata": {
|
||||
"ImageID": SAMPLE_IMAGE_ID,
|
||||
"RepoDigests": [f"myimage@sha256:{SAMPLE_IMAGE_SHA}abcdef1234567890"],
|
||||
},
|
||||
"Results": [
|
||||
{
|
||||
"Target": "myimage:latest (debian 12)",
|
||||
@@ -68,7 +80,36 @@ SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
|
||||
"Secrets": [SAMPLE_SECRET_FINDING],
|
||||
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
# Trivy output with only RepoDigests (no ImageID) for fallback testing
|
||||
SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT = {
|
||||
"Metadata": {
|
||||
"RepoDigests": ["alpine@sha256:e5f6g7h8i9j0abcdef1234567890"],
|
||||
},
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
"Type": "alpine",
|
||||
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Trivy output with no Metadata at all
|
||||
SAMPLE_TRIVY_NO_METADATA_OUTPUT = {
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
"Type": "alpine",
|
||||
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@@ -90,3 +131,13 @@ def get_invalid_trivy_output():
|
||||
def get_multi_type_trivy_output():
|
||||
"""Return Trivy output with multiple finding types as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
|
||||
|
||||
|
||||
def get_repo_digest_only_trivy_output():
|
||||
"""Return Trivy output with only RepoDigests (no ImageID) as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_REPO_DIGEST_ONLY_OUTPUT)
|
||||
|
||||
|
||||
def get_no_metadata_trivy_output():
|
||||
"""Return Trivy output with no Metadata as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_NO_METADATA_OUTPUT)
|
||||
|
||||
@@ -15,11 +15,13 @@ from prowler.providers.image.exceptions.exceptions import (
|
||||
ImageListFileNotFoundError,
|
||||
ImageListFileReadError,
|
||||
ImageNoImagesProvidedError,
|
||||
ImageRegistryAuthError,
|
||||
ImageScanError,
|
||||
ImageTrivyBinaryNotFoundError,
|
||||
)
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
from tests.providers.image.image_fixtures import (
|
||||
SAMPLE_IMAGE_SHA,
|
||||
SAMPLE_MISCONFIGURATION_FINDING,
|
||||
SAMPLE_SECRET_FINDING,
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING,
|
||||
@@ -27,6 +29,8 @@ from tests.providers.image.image_fixtures import (
|
||||
get_empty_trivy_output,
|
||||
get_invalid_trivy_output,
|
||||
get_multi_type_trivy_output,
|
||||
get_no_metadata_trivy_output,
|
||||
get_repo_digest_only_trivy_output,
|
||||
get_sample_trivy_json_output,
|
||||
)
|
||||
|
||||
@@ -42,10 +46,6 @@ def _make_provider(**kwargs):
|
||||
|
||||
|
||||
class TestImageProvider:
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""},
|
||||
)
|
||||
def test_image_provider(self):
|
||||
"""Test default initialization."""
|
||||
provider = _make_provider()
|
||||
@@ -124,22 +124,27 @@ class TestImageProvider:
|
||||
provider = _make_provider()
|
||||
report = provider._process_finding(
|
||||
SAMPLE_VULNERABILITY_FINDING,
|
||||
"alpine:3.18",
|
||||
"alpine:3.18 (alpine 3.18.0)",
|
||||
"alpine",
|
||||
image_sha="c1aabb73d233",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.status == "FAIL"
|
||||
assert report.check_metadata.CheckID == "CVE-2024-1234"
|
||||
assert report.check_metadata.Severity == "high"
|
||||
assert report.check_metadata.ServiceName == "alpine"
|
||||
assert report.check_metadata.ServiceName == "container-image"
|
||||
assert report.check_metadata.ResourceType == "container-image"
|
||||
assert report.check_metadata.ResourceGroup == "container"
|
||||
assert report.package_name == "openssl"
|
||||
assert report.installed_version == "1.1.1k-r0"
|
||||
assert report.fixed_version == "1.1.1l-r0"
|
||||
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
|
||||
assert report.resource_name == "alpine:3.18"
|
||||
assert report.image_sha == "c1aabb73d233"
|
||||
assert report.resource_details == "alpine:3.18 (alpine 3.18.0)"
|
||||
assert report.region == "container"
|
||||
assert report.check_metadata.Categories == ["vulnerability"]
|
||||
assert report.check_metadata.RelatedUrl == ""
|
||||
|
||||
def test_process_finding_secret(self):
|
||||
"""Test processing a secret finding (identified by RuleID)."""
|
||||
@@ -147,14 +152,15 @@ class TestImageProvider:
|
||||
report = provider._process_finding(
|
||||
SAMPLE_SECRET_FINDING,
|
||||
"myimage:latest",
|
||||
"secret",
|
||||
"myimage:latest (debian 12)",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.status == "FAIL"
|
||||
assert report.check_metadata.CheckID == "aws-access-key-id"
|
||||
assert report.check_metadata.Severity == "critical"
|
||||
assert report.check_metadata.ServiceName == "secret"
|
||||
assert report.check_metadata.ServiceName == "container-image"
|
||||
assert report.check_metadata.Categories == ["secrets"]
|
||||
|
||||
def test_process_finding_misconfiguration(self):
|
||||
"""Test processing a misconfiguration finding (identified by ID)."""
|
||||
@@ -162,13 +168,14 @@ class TestImageProvider:
|
||||
report = provider._process_finding(
|
||||
SAMPLE_MISCONFIGURATION_FINDING,
|
||||
"myimage:latest",
|
||||
"misconfiguration",
|
||||
"myimage:latest (debian 12)",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.check_metadata.CheckID == "DS001"
|
||||
assert report.check_metadata.Severity == "medium"
|
||||
assert report.check_metadata.ServiceName == "misconfiguration"
|
||||
assert report.check_metadata.ServiceName == "container-image"
|
||||
assert report.check_metadata.Categories == []
|
||||
|
||||
def test_process_finding_unknown_severity(self):
|
||||
"""Test that UNKNOWN severity is mapped to informational."""
|
||||
@@ -176,7 +183,7 @@ class TestImageProvider:
|
||||
report = provider._process_finding(
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING,
|
||||
"myimage:latest",
|
||||
"alpine",
|
||||
"myimage:latest (alpine 3.18.0)",
|
||||
)
|
||||
|
||||
assert report.check_metadata.Severity == "informational"
|
||||
@@ -195,6 +202,9 @@ class TestImageProvider:
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
|
||||
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
|
||||
assert reports[0].resource_name == "alpine:3.18"
|
||||
assert reports[0].check_metadata.ServiceName == "container-image"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_empty_output(self, mock_subprocess):
|
||||
@@ -279,20 +289,23 @@ class TestImageProvider:
|
||||
)
|
||||
assert "alpine:3.18" in output
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_success(self, mock_subprocess):
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_test_connection_success(self, mock_factory):
|
||||
"""Test successful connection returns is_connected=True."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.list_tags.return_value = ["3.18", "latest"]
|
||||
mock_factory.return_value = mock_adapter
|
||||
|
||||
result = ImageProvider.test_connection(image="alpine:3.18")
|
||||
|
||||
assert result.is_connected is True
|
||||
mock_adapter.list_tags.assert_called_once_with("library/alpine")
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_auth_failure(self, mock_subprocess):
|
||||
"""Test 401 error returns auth failure."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1, stderr="401 unauthorized"
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_test_connection_auth_failure(self, mock_factory):
|
||||
"""Test registry auth error returns auth failure."""
|
||||
mock_factory.return_value = MagicMock(
|
||||
list_tags=MagicMock(side_effect=ImageRegistryAuthError(file=__file__))
|
||||
)
|
||||
|
||||
result = ImageProvider.test_connection(image="private/image:latest")
|
||||
@@ -300,16 +313,36 @@ class TestImageProvider:
|
||||
assert result.is_connected is False
|
||||
assert "Authentication failed" in result.error
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_not_found(self, mock_subprocess):
|
||||
"""Test 404 error returns not found."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=1, stderr="404 not found")
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_test_connection_not_found(self, mock_factory):
|
||||
"""Test tag not found returns not found error."""
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.list_tags.return_value = ["v1", "v2"]
|
||||
mock_factory.return_value = mock_adapter
|
||||
|
||||
result = ImageProvider.test_connection(image="nonexistent/image:latest")
|
||||
|
||||
assert result.is_connected is False
|
||||
assert "not found" in result.error
|
||||
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_test_connection_registry_url(self, mock_factory):
|
||||
"""Test registry URL (namespace) uses list_repositories."""
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.list_repositories.return_value = ["andoniaf/myapp"]
|
||||
mock_factory.return_value = mock_adapter
|
||||
|
||||
result = ImageProvider.test_connection(image="docker.io/andoniaf")
|
||||
|
||||
assert result.is_connected is True
|
||||
mock_factory.assert_called_once_with(
|
||||
registry_url="docker.io/andoniaf",
|
||||
username=None,
|
||||
password=None,
|
||||
token=None,
|
||||
)
|
||||
mock_adapter.list_repositories.assert_called_once()
|
||||
|
||||
def test_build_status_extended(self):
|
||||
"""Test status message content for different finding types."""
|
||||
provider = _make_provider()
|
||||
@@ -394,6 +427,51 @@ class TestImageProvider:
|
||||
for _ in provider._scan_single_image("private/image:latest"):
|
||||
pass
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_sha_extraction_from_image_id(self, mock_subprocess):
|
||||
"""Test that image_sha is extracted from Trivy Metadata.ImageID."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider._scan_single_image("alpine:3.18"):
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].image_sha == SAMPLE_IMAGE_SHA
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_sha_extraction_fallback_to_repo_digests(self, mock_subprocess):
|
||||
"""Test that image_sha falls back to RepoDigests when ImageID is absent."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_repo_digest_only_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider._scan_single_image("alpine:3.18"):
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].image_sha == "e5f6g7h8i9j0"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_sha_extraction_no_metadata(self, mock_subprocess):
|
||||
"""Test that image_sha is empty when no Metadata is present."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_no_metadata_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider._scan_single_image("alpine:3.18"):
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].image_sha == ""
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_propagates_scan_error(self, mock_subprocess):
|
||||
"""Test that run_scan() re-raises ImageScanError instead of swallowing it."""
|
||||
@@ -409,17 +487,14 @@ class TestImageProvider:
|
||||
pass
|
||||
|
||||
|
||||
@patch.dict(
|
||||
os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""}
|
||||
)
|
||||
class TestImageProviderRegistryAuth:
|
||||
def test_no_auth_by_default(self):
|
||||
"""Test that no auth is set when no credentials are provided."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert not provider.registry_username
|
||||
assert not provider.registry_password
|
||||
assert not provider.registry_token
|
||||
assert provider.registry_username is None
|
||||
assert provider.registry_password is None
|
||||
assert provider.registry_token is None
|
||||
assert provider.auth_method == "No auth"
|
||||
|
||||
def test_basic_auth_with_explicit_params(self):
|
||||
@@ -431,7 +506,7 @@ class TestImageProviderRegistryAuth:
|
||||
|
||||
assert provider.registry_username == "myuser"
|
||||
assert provider.registry_password == "mypass"
|
||||
assert provider.auth_method == "Basic auth"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
def test_token_auth_with_explicit_param(self):
|
||||
"""Test token auth via explicit constructor param."""
|
||||
@@ -448,7 +523,7 @@ class TestImageProviderRegistryAuth:
|
||||
registry_token="my-token",
|
||||
)
|
||||
|
||||
assert provider.auth_method == "Basic auth"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
@patch.dict(
|
||||
os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
|
||||
@@ -459,7 +534,7 @@ class TestImageProviderRegistryAuth:
|
||||
|
||||
assert provider.registry_username == "envuser"
|
||||
assert provider.registry_password == "envpass"
|
||||
assert provider.auth_method == "Basic auth"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
@patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
|
||||
def test_token_auth_from_env_var(self):
|
||||
@@ -491,8 +566,8 @@ class TestImageProviderRegistryAuth:
|
||||
assert "TRIVY_PASSWORD" not in env
|
||||
assert "TRIVY_REGISTRY_TOKEN" not in env
|
||||
|
||||
def test_build_trivy_env_basic_auth_injects_trivy_vars(self):
|
||||
"""Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for Trivy native auth."""
|
||||
def test_build_trivy_env_basic_auth_sets_env_vars(self):
|
||||
"""Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for native Trivy auth."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
@@ -510,8 +585,8 @@ class TestImageProviderRegistryAuth:
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_execute_trivy_injects_trivy_env_with_basic_auth(self, mock_subprocess):
|
||||
"""Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for Trivy native auth."""
|
||||
def test_execute_trivy_sets_trivy_env_with_basic_auth(self, mock_subprocess):
|
||||
"""Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for native Trivy auth."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
@@ -527,10 +602,12 @@ class TestImageProviderRegistryAuth:
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_with_basic_auth(self, mock_subprocess):
|
||||
"""Test test_connection passes TRIVY_USERNAME/PASSWORD via env for Trivy native auth."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_test_connection_with_basic_auth(self, mock_factory):
|
||||
"""Test test_connection passes credentials to the registry adapter."""
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.list_tags.return_value = ["v1"]
|
||||
mock_factory.return_value = mock_adapter
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="private.registry.io/myapp:v1",
|
||||
@@ -539,18 +616,19 @@ class TestImageProviderRegistryAuth:
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
# Should have 1 subprocess call: trivy only (no docker login/pull/logout)
|
||||
assert mock_subprocess.call_count == 1
|
||||
trivy_call = mock_subprocess.call_args
|
||||
assert trivy_call.args[0][0] == "trivy"
|
||||
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
mock_factory.assert_called_once_with(
|
||||
registry_url="private.registry.io",
|
||||
username="myuser",
|
||||
password="mypass",
|
||||
token=None,
|
||||
)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_with_token(self, mock_subprocess):
|
||||
"""Test test_connection passes token via env."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_test_connection_with_token(self, mock_factory):
|
||||
"""Test test_connection passes token to the registry adapter."""
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.list_tags.return_value = ["v1"]
|
||||
mock_factory.return_value = mock_adapter
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="private.registry.io/myapp:v1",
|
||||
@@ -558,9 +636,12 @@ class TestImageProviderRegistryAuth:
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
call_kwargs = mock_subprocess.call_args
|
||||
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
mock_factory.assert_called_once_with(
|
||||
registry_url="private.registry.io",
|
||||
username=None,
|
||||
password=None,
|
||||
token="my-token",
|
||||
)
|
||||
|
||||
def test_print_credentials_shows_auth_method(self):
|
||||
"""Test that print_credentials outputs the auth method."""
|
||||
@@ -573,7 +654,7 @@ class TestImageProviderRegistryAuth:
|
||||
output = " ".join(
|
||||
str(call.args[0]) for call in mock_print.call_args_list if call.args
|
||||
)
|
||||
assert "Basic auth" in output
|
||||
assert "Docker login" in output
|
||||
|
||||
|
||||
class TestExtractRegistry:
|
||||
@@ -616,120 +697,42 @@ class TestExtractRegistry:
|
||||
assert ImageProvider._extract_registry("nginx") is None
|
||||
|
||||
|
||||
class TestTrivyAuthIntegration:
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):
|
||||
"""Test that run_scan() passes TRIVY_USERNAME/PASSWORD via env when credentials are set."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
class TestIsRegistryUrl:
|
||||
def test_registry_url_with_namespace(self):
|
||||
assert ImageProvider._is_registry_url("docker.io/andoniaf") is True
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
def test_registry_url_ghcr(self):
|
||||
assert ImageProvider._is_registry_url("ghcr.io/org") is True
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
# Only trivy calls, no docker login/pull
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
env = calls[0].kwargs.get("env") or calls[0][1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
def test_image_ref_with_tag(self):
|
||||
assert ImageProvider._is_registry_url("ghcr.io/user/image:tag") is False
|
||||
|
||||
@patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_no_trivy_auth_without_credentials(self, mock_subprocess):
|
||||
"""Test that run_scan() does NOT set TRIVY_USERNAME/PASSWORD when no credentials."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
def test_image_ref_with_repo(self):
|
||||
assert ImageProvider._is_registry_url("ghcr.io/user/image") is False
|
||||
|
||||
def test_dockerhub_short_image(self):
|
||||
assert ImageProvider._is_registry_url("alpine:3.18") is False
|
||||
|
||||
def test_dockerhub_with_namespace(self):
|
||||
assert ImageProvider._is_registry_url("andoniaf/test:tag") is False
|
||||
|
||||
def test_bare_image_name(self):
|
||||
assert ImageProvider._is_registry_url("nginx") is False
|
||||
|
||||
def test_localhost_namespace(self):
|
||||
assert ImageProvider._is_registry_url("localhost:5000/myns") is True
|
||||
|
||||
def test_localhost_image_with_tag(self):
|
||||
assert ImageProvider._is_registry_url("localhost:5000/myns/image:v1") is False
|
||||
|
||||
|
||||
class TestCleanup:
|
||||
def test_cleanup_idempotent(self):
|
||||
"""Test cleanup is safe to call multiple times."""
|
||||
provider = _make_provider()
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
|
||||
@patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_token_auth_via_env(self, mock_subprocess):
|
||||
"""Test that run_scan() passes TRIVY_REGISTRY_TOKEN when only token is provided."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(registry_token="my-token")
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
env = calls[0].kwargs.get("env") or calls[0][1].get("env")
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_with_credentials_only_calls_trivy(self, mock_subprocess):
|
||||
"""Test that run() only calls trivy (no docker login/pull/logout)."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
provider.run()
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_multiple_images_all_get_trivy_env(self, mock_subprocess):
|
||||
"""Test that all trivy calls get TRIVY_USERNAME/PASSWORD when scanning multiple images."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
trivy_calls = [c for c in calls if c.args[0][0] == "trivy"]
|
||||
assert len(trivy_calls) == 2
|
||||
for call in trivy_calls:
|
||||
env = call.kwargs.get("env") or call[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_docker_hub_uses_trivy_auth(self, mock_subprocess):
|
||||
"""Test test_connection passes TRIVY creds for Docker Hub images."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="andoniaf/test-private:tag",
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
assert mock_subprocess.call_count == 1
|
||||
trivy_call = mock_subprocess.call_args
|
||||
assert trivy_call.args[0][0] == "trivy"
|
||||
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
provider.cleanup()
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
class TestImageProviderInputValidation:
|
||||
@@ -921,3 +924,67 @@ class TestImageProviderNameValidation:
|
||||
|
||||
with pytest.raises(ImageListFileReadError):
|
||||
_make_provider(images=None, image_list_file=file_path)
|
||||
|
||||
|
||||
class TestScanPerImage:
|
||||
@patch("subprocess.run")
|
||||
def test_yields_per_image(self, mock_subprocess):
|
||||
"""Test that scan_per_image yields (name, findings) per image."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
|
||||
|
||||
results = list(provider.scan_per_image())
|
||||
|
||||
assert len(results) == 2
|
||||
for name, findings in results:
|
||||
assert isinstance(name, str)
|
||||
assert isinstance(findings, list)
|
||||
assert all(isinstance(f, CheckReportImage) for f in findings)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_reraises_scan_error(self, mock_subprocess):
|
||||
"""Test that ImageScanError propagates from scan_per_image."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1, stdout="", stderr="scan failed"
|
||||
)
|
||||
provider = _make_provider(images=["alpine:3.18"])
|
||||
|
||||
with pytest.raises(ImageScanError):
|
||||
list(provider.scan_per_image())
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_skips_generic_error(self, mock_subprocess):
|
||||
"""Test that a generic RuntimeError in _scan_single_image yields empty findings and continues."""
|
||||
|
||||
def side_effect(cmd, **kwargs):
|
||||
if "bad:image" in cmd:
|
||||
raise RuntimeError("unexpected error")
|
||||
return MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
mock_subprocess.side_effect = side_effect
|
||||
provider = _make_provider(images=["bad:image", "alpine:3.18"])
|
||||
|
||||
results = list(provider.scan_per_image())
|
||||
|
||||
assert len(results) == 2
|
||||
assert results[0][0] == "bad:image"
|
||||
assert results[0][1] == []
|
||||
assert results[1][0] == "alpine:3.18"
|
||||
assert len(results[1][1]) > 0
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_calls_cleanup(self, mock_subprocess):
|
||||
"""Test that cleanup is called even after scan_per_image completes."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(images=["alpine:3.18"])
|
||||
|
||||
with mock.patch.object(provider, "cleanup") as mock_cleanup:
|
||||
list(provider.scan_per_image())
|
||||
|
||||
mock_cleanup.assert_called_once()
|
||||
|
||||
@@ -99,7 +99,7 @@ class TestDockerHubListTags:
|
||||
class TestDockerHubLogin:
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_login_failure(self, mock_request):
|
||||
resp = MagicMock(status_code=401)
|
||||
resp = MagicMock(status_code=401, text="invalid credentials")
|
||||
mock_request.return_value = resp
|
||||
adapter = DockerHubAdapter("docker.io/myorg", username="bad", password="creds")
|
||||
with pytest.raises(ImageRegistryAuthError, match="login failed"):
|
||||
@@ -110,6 +110,29 @@ class TestDockerHubLogin:
|
||||
adapter._hub_login() # Should not raise
|
||||
assert adapter._hub_jwt is None
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_login_401_includes_response_body(self, mock_request):
|
||||
resp = MagicMock(
|
||||
status_code=401, text='{"detail":"Incorrect authentication credentials"}'
|
||||
)
|
||||
mock_request.return_value = resp
|
||||
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
|
||||
with pytest.raises(
|
||||
ImageRegistryAuthError, match="Incorrect authentication credentials"
|
||||
):
|
||||
adapter._hub_login()
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.time.sleep")
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_login_500_retried_then_raises_network_error(
|
||||
self, mock_request, mock_sleep
|
||||
):
|
||||
mock_request.return_value = MagicMock(status_code=500)
|
||||
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
|
||||
with pytest.raises(ImageRegistryNetworkError, match="Server error"):
|
||||
adapter._hub_login()
|
||||
assert mock_request.call_count == 3
|
||||
|
||||
|
||||
class TestDockerHubRetry:
|
||||
@patch("prowler.providers.image.lib.registry.base.time.sleep")
|
||||
@@ -133,6 +156,63 @@ class TestDockerHubRetry:
|
||||
adapter._request_with_retry("GET", "https://hub.docker.com")
|
||||
assert mock_request.call_count == 3
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.time.sleep")
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_retry_on_500(self, mock_request, mock_sleep):
|
||||
resp_500 = MagicMock(status_code=500)
|
||||
resp_200 = MagicMock(status_code=200)
|
||||
mock_request.side_effect = [resp_500, resp_200]
|
||||
adapter = DockerHubAdapter("docker.io/myorg")
|
||||
result = adapter._request_with_retry("GET", "https://hub.docker.com")
|
||||
assert result.status_code == 200
|
||||
assert mock_request.call_count == 2
|
||||
mock_sleep.assert_called_once()
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.time.sleep")
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_retry_exhausted_on_500_raises_network_error(
|
||||
self, mock_request, mock_sleep
|
||||
):
|
||||
mock_request.return_value = MagicMock(status_code=500)
|
||||
adapter = DockerHubAdapter("docker.io/myorg")
|
||||
with pytest.raises(
|
||||
ImageRegistryNetworkError, match="Server error.*HTTP 500.*3 attempts"
|
||||
):
|
||||
adapter._request_with_retry("GET", "https://hub.docker.com")
|
||||
assert mock_request.call_count == 3
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.time.sleep")
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_4xx_not_retried(self, mock_request, mock_sleep):
|
||||
mock_request.return_value = MagicMock(status_code=403)
|
||||
adapter = DockerHubAdapter("docker.io/myorg")
|
||||
result = adapter._request_with_retry("GET", "https://hub.docker.com")
|
||||
assert result.status_code == 403
|
||||
assert mock_request.call_count == 1
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_request_sends_user_agent(self, mock_request):
|
||||
mock_request.return_value = MagicMock(status_code=200)
|
||||
adapter = DockerHubAdapter("docker.io/myorg")
|
||||
adapter._request_with_retry("GET", "https://hub.docker.com")
|
||||
_, kwargs = mock_request.call_args
|
||||
from prowler.config.config import prowler_version
|
||||
|
||||
assert (
|
||||
kwargs["headers"]["User-Agent"]
|
||||
== f"Prowler/{prowler_version} (registry-adapter)"
|
||||
)
|
||||
|
||||
@patch("prowler.providers.image.lib.registry.base.time.sleep")
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
def test_retry_500_includes_response_body(self, mock_request, mock_sleep):
|
||||
resp_500 = MagicMock(status_code=500, text="<html>Cloudflare error</html>")
|
||||
mock_request.return_value = resp_500
|
||||
adapter = DockerHubAdapter("docker.io/myorg")
|
||||
with pytest.raises(ImageRegistryNetworkError, match="Cloudflare error"):
|
||||
adapter._request_with_retry("GET", "https://hub.docker.com")
|
||||
|
||||
|
||||
class TestDockerHubEmptyTokens:
|
||||
@patch("prowler.providers.image.lib.registry.base.requests.request")
|
||||
|
||||
@@ -288,31 +288,33 @@ class TestOciAdapterRetry:
|
||||
|
||||
class TestOciAdapterNextPageUrl:
|
||||
def test_no_link_header(self):
|
||||
adapter = OciRegistryAdapter("reg.io")
|
||||
resp = MagicMock(headers={})
|
||||
assert adapter._next_page_url(resp) is None
|
||||
assert OciRegistryAdapter._next_page_url(resp) is None
|
||||
|
||||
def test_link_header_with_next(self):
|
||||
adapter = OciRegistryAdapter("reg.io")
|
||||
resp = MagicMock(
|
||||
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'}
|
||||
)
|
||||
assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?n=200&last=b"
|
||||
assert (
|
||||
OciRegistryAdapter._next_page_url(resp)
|
||||
== "https://reg.io/v2/_catalog?n=200&last=b"
|
||||
)
|
||||
|
||||
def test_link_header_relative_url(self):
|
||||
resp = MagicMock(
|
||||
headers={"Link": '</v2/_catalog?n=200&last=b>; rel="next"'},
|
||||
url="https://reg.io/v2/_catalog?n=200",
|
||||
)
|
||||
assert (
|
||||
OciRegistryAdapter._next_page_url(resp)
|
||||
== "https://reg.io/v2/_catalog?n=200&last=b"
|
||||
)
|
||||
|
||||
def test_link_header_no_next(self):
|
||||
adapter = OciRegistryAdapter("reg.io")
|
||||
resp = MagicMock(
|
||||
headers={"Link": '<https://reg.io/v2/_catalog?n=200>; rel="prev"'}
|
||||
)
|
||||
assert adapter._next_page_url(resp) is None
|
||||
|
||||
def test_link_header_relative_url(self):
|
||||
adapter = OciRegistryAdapter("reg.io")
|
||||
resp = MagicMock(
|
||||
url="https://reg.io/v2/_catalog?n=200",
|
||||
headers={"Link": '</v2/_catalog?last=b&n=200>; rel="next"'},
|
||||
)
|
||||
assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?last=b&n=200"
|
||||
assert OciRegistryAdapter._next_page_url(resp) is None
|
||||
|
||||
|
||||
class TestOciAdapterSSRF:
|
||||
|
||||
@@ -152,16 +152,15 @@ class TestEmptyRegistry:
|
||||
|
||||
class TestRegistryList:
|
||||
@patch("prowler.providers.image.image_provider.create_registry_adapter")
|
||||
def test_registry_list_prints_and_exits(self, mock_factory, capsys):
|
||||
def test_registry_list_prints_and_returns(self, mock_factory, capsys):
|
||||
adapter = MagicMock()
|
||||
adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
|
||||
adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True)
|
||||
provider = _build_provider(registry_list_images=True)
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "app/frontend" in captured.out
|
||||
assert "app/backend" in captured.out
|
||||
@@ -177,10 +176,9 @@ class TestRegistryList:
|
||||
adapter.list_tags.return_value = ["latest"]
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True, image_filter="^prod/")
|
||||
provider = _build_provider(registry_list_images=True, image_filter="^prod/")
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "prod/app" in captured.out
|
||||
assert "dev/app" not in captured.out
|
||||
@@ -193,10 +191,9 @@ class TestRegistryList:
|
||||
adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
|
||||
provider = _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "v1.0" in captured.out
|
||||
assert "dev-abc" not in captured.out
|
||||
@@ -210,10 +207,9 @@ class TestRegistryList:
|
||||
mock_factory.return_value = adapter
|
||||
|
||||
# max_images=1 would normally raise, but --registry-list skips it
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_build_provider(registry_list_images=True, max_images=1)
|
||||
provider = _build_provider(registry_list_images=True, max_images=1)
|
||||
|
||||
assert exc_info.value.code == 0
|
||||
assert provider._listing_only is True
|
||||
captured = capsys.readouterr()
|
||||
assert "6 images" in captured.out
|
||||
|
||||
|
||||
Reference in New Issue
Block a user