Compare commits

...

1 Commits

Author SHA1 Message Date
Andoni A.
f7adbc2dcf feat(image): add docker login and pull for private registry authentication
Trivy's remote source cannot authenticate against Docker Hub (and some
other registries) even after docker login. This adds a docker login +
docker pull flow before scanning so Trivy can access private images
from the local Docker daemon.

- Add _docker_login, _docker_pull, _docker_logout, cleanup methods
- Add _extract_registry to determine registry from image reference
- Wrap run() in try/finally to ensure cleanup on success or error
- Wire registry credentials from CLI args to ImageProvider
- Add ImageDockerLoginError and ImageDockerNotFoundError exceptions
2026-02-06 15:08:42 +01:00
4 changed files with 837 additions and 6 deletions

View File

@@ -284,6 +284,9 @@ class Provider(ABC):
timeout=arguments.timeout,
config_path=arguments.config_file,
fixer_config=fixer_config,
registry_username=arguments.registry_username,
registry_password=arguments.registry_password,
registry_token=arguments.registry_token,
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(

View File

@@ -30,6 +30,14 @@ class ImageBaseException(ProwlerException):
"message": "Error scanning container image.",
"remediation": "Check the image name and ensure it is accessible.",
},
(9006, "ImageDockerLoginError"): {
"message": "Docker login failed for registry authentication.",
"remediation": "Check your registry credentials and ensure the registry is reachable.",
},
(9007, "ImageDockerNotFoundError"): {
"message": "Docker binary not found.",
"remediation": "Install Docker to enable private registry authentication via docker login.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
@@ -97,3 +105,21 @@ class ImageScanError(ImageBaseException):
super().__init__(
9005, file=file, original_exception=original_exception, message=message
)
class ImageDockerLoginError(ImageBaseException):
"""Exception raised when docker login fails."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9006, file=file, original_exception=original_exception, message=message
)
class ImageDockerNotFoundError(ImageBaseException):
"""Exception raised when the docker binary is not found."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9007, file=file, original_exception=original_exception, message=message
)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
from typing import Generator, List
@@ -18,6 +19,8 @@ from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageDockerLoginError,
ImageDockerNotFoundError,
ImageFindingProcessingError,
ImageListFileNotFoundError,
ImageListFileReadError,
@@ -49,6 +52,9 @@ class ImageProvider(Provider):
config_path: str = None,
config_content: dict = None,
fixer_config: dict | None = None,
registry_username: str = None,
registry_password: str = None,
registry_token: str = None,
):
logger.info("Instantiating Image Provider...")
@@ -62,7 +68,21 @@ class ImageProvider(Provider):
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
self._auth_method = "No auth"
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
self.registry_username = registry_username or os.environ.get("TRIVY_USERNAME")
self.registry_password = registry_password or os.environ.get("TRIVY_PASSWORD")
self.registry_token = registry_token or os.environ.get("TRIVY_REGISTRY_TOKEN")
self._logged_in_registries: set = set()
if self.registry_username and self.registry_password:
self._auth_method = "Docker login"
logger.info("Using docker login for registry authentication")
elif self.registry_token:
self._auth_method = "Registry token"
logger.info("Using registry token for authentication")
else:
self._auth_method = "No auth"
# Load images from file if provided
if image_list_file:
@@ -150,6 +170,100 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
@staticmethod
def _extract_registry(image: str) -> str | None:
"""Extract registry hostname from an image reference.
Returns None for Docker Hub images (no registry prefix).
"""
parts = image.split("/")
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
return parts[0]
return None
def _docker_login(self, registry: str | None) -> None:
"""Run docker login --password-stdin for a registry."""
registry_label = registry or "Docker Hub"
cmd = [
"docker",
"login",
"--username",
self.registry_username,
"--password-stdin",
]
if registry:
cmd.append(registry)
try:
process = subprocess.run(
cmd,
input=self.registry_password,
capture_output=True,
text=True,
)
if process.returncode != 0:
raise ImageDockerLoginError(
file=__file__,
message=f"docker login failed for {registry_label}: {process.stderr.strip()}",
)
logger.info(f"Docker login successful for {registry_label}")
self._logged_in_registries.add(registry)
except FileNotFoundError:
raise ImageDockerNotFoundError(
file=__file__,
message="Docker binary not found. Install Docker to use registry authentication.",
)
def _docker_pull(self, image: str) -> None:
"""Pull an image using docker pull so Trivy can scan it from the local daemon.
Trivy's remote source cannot authenticate against Docker Hub (and some
other registries) even after docker login. Pulling into the local
Docker daemon first lets Trivy find the image via its "docker" source.
"""
try:
logger.info(f"Pulling image {image} via docker...")
process = subprocess.run(
["docker", "pull", image],
capture_output=True,
text=True,
)
if process.returncode != 0:
raise ImageScanError(
file=__file__,
message=f"docker pull failed for {image}: {process.stderr.strip()}",
)
logger.info(f"Successfully pulled {image}")
except FileNotFoundError:
raise ImageDockerNotFoundError(
file=__file__,
message="Docker binary not found. Install Docker to use registry authentication.",
)
def _docker_logout(self, registry: str | None) -> None:
"""Run docker logout for a registry. Logs warning on failure, never raises."""
registry_label = registry or "Docker Hub"
cmd = ["docker", "logout"]
if registry:
cmd.append(registry)
try:
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.warning(
f"docker logout failed for {registry_label}: {process.stderr.strip()}"
)
else:
logger.info(f"Docker logout successful for {registry_label}")
except Exception as error:
logger.warning(f"docker logout failed for {registry_label}: {error}")
def cleanup(self) -> None:
"""Logout from all registries that were logged in during the scan."""
for registry in self._logged_in_registries:
self._docker_logout(registry)
self._logged_in_registries.clear()
def _process_finding(
self, finding: dict, image_name: str, finding_type: str
) -> CheckReportImage:
@@ -273,10 +387,13 @@ class ImageProvider(Provider):
def run(self) -> List[CheckReportImage]:
"""Execute the container image scan."""
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
try:
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
finally:
self.cleanup()
def run_scan(self) -> Generator[List[CheckReportImage], None, None]:
"""
@@ -285,6 +402,13 @@ class ImageProvider(Provider):
Yields:
List[CheckReportImage]: Batches of findings
"""
if self.registry_username and self.registry_password:
registries = {self._extract_registry(img) for img in self.images}
for registry in registries:
self._docker_login(registry)
for image in self.images:
self._docker_pull(image)
for image in self.images:
try:
yield from self._scan_single_image(image)
@@ -407,8 +531,22 @@ class ImageProvider(Provider):
)
logger.error(f"Error scanning image {image}: {error}")
def _build_trivy_env(self) -> dict:
"""Build environment variables for Trivy, injecting registry credentials.
When username+password are provided, we rely on docker login (which stores
credentials in Docker's credential store). Setting TRIVY_USERNAME/TRIVY_PASSWORD
would cause Trivy to attempt its own token-exchange auth, which fails for
Docker Hub and other registries that require docker login.
"""
env = dict(os.environ)
if self.registry_token:
env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
return env
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
"""Execute Trivy command with optional progress bar."""
env = self._build_trivy_env()
try:
if sys.stdout.isatty():
with alive_bar(
@@ -423,6 +561,7 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
env=env,
)
bar.title = f"-> Scan completed for {image}"
return process
@@ -432,12 +571,13 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
env=env,
)
logger.info(f"Scan completed for {image}")
return process
except (AttributeError, OSError):
logger.info(f"Scanning {image}...")
return subprocess.run(command, capture_output=True, text=True)
return subprocess.run(command, capture_output=True, text=True, env=env)
def _log_trivy_stderr(self, stderr: str) -> None:
"""Parse and log Trivy's stderr output."""
@@ -507,6 +647,10 @@ class ImageProvider(Provider):
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
report_lines.append(
f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
@@ -514,6 +658,9 @@ class ImageProvider(Provider):
image: str = None,
raise_on_exception: bool = True,
provider_id: str = None,
registry_username: str = None,
registry_password: str = None,
registry_token: str = None,
) -> "Connection":
"""
Test connection to container registry by attempting to inspect an image.
@@ -522,10 +669,15 @@ class ImageProvider(Provider):
image: Container image to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
registry_username: Registry username for basic auth
registry_password: Registry password for basic auth
registry_token: Registry token for token-based auth
Returns:
Connection: Connection object with success status
"""
docker_logged_in = False
registry = None
try:
if provider_id and not image:
image = provider_id
@@ -533,6 +685,55 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
# Build env with registry credentials
env = dict(os.environ)
if registry_username and registry_password:
# Docker login for reliable private registry auth
registry = ImageProvider._extract_registry(image)
registry_label = registry or "Docker Hub"
login_cmd = [
"docker",
"login",
"--username",
registry_username,
"--password-stdin",
]
if registry:
login_cmd.append(registry)
try:
login_result = subprocess.run(
login_cmd,
input=registry_password,
capture_output=True,
text=True,
)
if login_result.returncode == 0:
docker_logged_in = True
logger.info(
f"Docker login successful for {registry_label} (test_connection)"
)
# Pull the image so Trivy can scan from the local daemon
pull_result = subprocess.run(
["docker", "pull", image],
capture_output=True,
text=True,
)
if pull_result.returncode != 0:
return Connection(
is_connected=False,
error=f"docker pull failed for {image}: {pull_result.stderr.strip()}",
)
else:
logger.warning(
f"Docker login failed for {registry_label}: {login_result.stderr.strip()}"
)
except FileNotFoundError:
logger.warning(
"Docker binary not found, skipping docker login for test_connection"
)
elif registry_token:
env["TRIVY_REGISTRY_TOKEN"] = registry_token
# Test by running trivy with --skip-update to just test image access
process = subprocess.run(
[
@@ -545,6 +746,7 @@ class ImageProvider(Provider):
capture_output=True,
text=True,
timeout=60,
env=env,
)
if process.returncode == 0:
@@ -584,3 +786,12 @@ class ImageProvider(Provider):
is_connected=False,
error=f"Unexpected error: {str(error)}",
)
finally:
if docker_logged_in:
logout_cmd = ["docker", "logout"]
if registry:
logout_cmd.append(registry)
try:
subprocess.run(logout_cmd, capture_output=True, text=True)
except Exception:
pass

View File

@@ -1,3 +1,4 @@
import os
import tempfile
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -6,6 +7,8 @@ import pytest
from prowler.lib.check.models import CheckReportImage
from prowler.providers.image.exceptions.exceptions import (
ImageDockerLoginError,
ImageDockerNotFoundError,
ImageListFileNotFoundError,
ImageNoImagesProvidedError,
ImageScanError,
@@ -395,3 +398,591 @@ class TestImageProvider:
with pytest.raises(ImageScanError):
for _ in provider.run_scan():
pass
class TestImageProviderRegistryAuth:
def test_no_auth_by_default(self):
"""Test that no auth is set when no credentials are provided."""
provider = _make_provider()
assert provider.registry_username is None
assert provider.registry_password is None
assert provider.registry_token is None
assert provider.auth_method == "No auth"
def test_basic_auth_with_explicit_params(self):
"""Test basic auth via explicit constructor params."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
assert provider.registry_username == "myuser"
assert provider.registry_password == "mypass"
assert provider.auth_method == "Docker login"
def test_token_auth_with_explicit_param(self):
"""Test token auth via explicit constructor param."""
provider = _make_provider(registry_token="my-token-123")
assert provider.registry_token == "my-token-123"
assert provider.auth_method == "Registry token"
def test_basic_auth_takes_precedence_over_token(self):
"""Test that username/password takes precedence over token."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
registry_token="my-token",
)
assert provider.auth_method == "Docker login"
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
def test_basic_auth_from_env_vars(self):
"""Test that env vars are used as fallback for basic auth."""
provider = _make_provider()
assert provider.registry_username == "envuser"
assert provider.registry_password == "envpass"
assert provider.auth_method == "Docker login"
@patch.dict(os.environ, {"TRIVY_REGISTRY_TOKEN": "env-token"})
def test_token_auth_from_env_var(self):
"""Test that env var is used as fallback for token auth."""
provider = _make_provider()
assert provider.registry_token == "env-token"
assert provider.auth_method == "Registry token"
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
def test_explicit_params_override_env_vars(self):
"""Test that explicit params take precedence over env vars."""
provider = _make_provider(
registry_username="explicit",
registry_password="explicit-pass",
)
assert provider.registry_username == "explicit"
assert provider.registry_password == "explicit-pass"
def test_build_trivy_env_no_auth(self):
"""Test that _build_trivy_env returns base env when no auth."""
provider = _make_provider()
env = provider._build_trivy_env()
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
assert "TRIVY_REGISTRY_TOKEN" not in env
def test_build_trivy_env_basic_auth_no_env_vars(self):
"""Test that _build_trivy_env does NOT inject TRIVY_USERNAME/PASSWORD (docker login handles it)."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
env = provider._build_trivy_env()
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
def test_build_trivy_env_token_auth(self):
"""Test that _build_trivy_env injects registry token."""
provider = _make_provider(registry_token="my-token")
env = provider._build_trivy_env()
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
def test_execute_trivy_no_trivy_env_with_basic_auth(self, mock_subprocess):
"""Test that _execute_trivy does NOT set TRIVY_USERNAME/PASSWORD (docker login handles auth)."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
@patch("subprocess.run")
def test_test_connection_with_basic_auth(self, mock_subprocess):
"""Test test_connection does docker login, pull, trivy, logout and does NOT set TRIVY_* env vars."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_username="myuser",
registry_password="mypass",
)
assert result.is_connected is True
# Should have 4 subprocess calls: docker login, docker pull, trivy, docker logout
assert mock_subprocess.call_count == 4
login_call = mock_subprocess.call_args_list[0]
assert login_call.args[0] == [
"docker",
"login",
"--username",
"myuser",
"--password-stdin",
"private.registry.io",
]
assert login_call.kwargs["input"] == "mypass"
pull_call = mock_subprocess.call_args_list[1]
assert pull_call.args[0] == ["docker", "pull", "private.registry.io/myapp:v1"]
trivy_call = mock_subprocess.call_args_list[2]
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
logout_call = mock_subprocess.call_args_list[3]
assert logout_call.args[0] == ["docker", "logout", "private.registry.io"]
@patch("subprocess.run")
def test_test_connection_with_token(self, mock_subprocess):
"""Test test_connection passes token via env."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_token="my-token",
)
assert result.is_connected is True
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
def test_print_credentials_shows_auth_method(self):
"""Test that print_credentials outputs the auth method."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with mock.patch("builtins.print") as mock_print:
provider.print_credentials()
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
assert "Docker login" in output
class TestExtractRegistry:
def test_docker_hub_simple(self):
assert ImageProvider._extract_registry("alpine:3.18") is None
def test_docker_hub_with_namespace(self):
assert ImageProvider._extract_registry("andoniaf/test-private:tag") is None
def test_ghcr(self):
assert ImageProvider._extract_registry("ghcr.io/user/image:tag") == "ghcr.io"
def test_ecr(self):
assert (
ImageProvider._extract_registry(
"123456789012.dkr.ecr.us-east-1.amazonaws.com/repo:tag"
)
== "123456789012.dkr.ecr.us-east-1.amazonaws.com"
)
def test_localhost_with_port(self):
assert (
ImageProvider._extract_registry("localhost:5000/myimage:latest")
== "localhost:5000"
)
def test_custom_registry_with_port(self):
assert (
ImageProvider._extract_registry("myregistry.io:5000/image:tag")
== "myregistry.io:5000"
)
def test_digest_reference(self):
assert (
ImageProvider._extract_registry("ghcr.io/user/image@sha256:abc123")
== "ghcr.io"
)
def test_bare_image_name(self):
assert ImageProvider._extract_registry("nginx") is None
class TestDockerLogin:
@patch("subprocess.run")
def test_login_success(self, mock_subprocess):
"""Test successful docker login."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
provider._docker_login("ghcr.io")
mock_subprocess.assert_called_once()
call_args = mock_subprocess.call_args
assert call_args.args[0] == [
"docker",
"login",
"--username",
"myuser",
"--password-stdin",
"ghcr.io",
]
assert call_args.kwargs["input"] == "mypass"
assert "ghcr.io" in provider._logged_in_registries
@patch("subprocess.run")
def test_login_docker_hub(self, mock_subprocess):
"""Test docker login for Docker Hub (registry=None)."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
provider._docker_login(None)
call_args = mock_subprocess.call_args
assert call_args.args[0] == [
"docker",
"login",
"--username",
"myuser",
"--password-stdin",
]
assert None in provider._logged_in_registries
@patch("subprocess.run")
def test_login_failure_raises(self, mock_subprocess):
"""Test docker login failure raises ImageDockerLoginError."""
mock_subprocess.return_value = MagicMock(
returncode=1, stderr="unauthorized: incorrect username or password"
)
provider = _make_provider(
registry_username="myuser",
registry_password="badpass",
)
with pytest.raises(ImageDockerLoginError):
provider._docker_login("ghcr.io")
assert "ghcr.io" not in provider._logged_in_registries
@patch("subprocess.run")
def test_login_docker_not_found(self, mock_subprocess):
"""Test docker binary not found raises ImageDockerNotFoundError."""
mock_subprocess.side_effect = FileNotFoundError("docker not found")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with pytest.raises(ImageDockerNotFoundError):
provider._docker_login("ghcr.io")
@patch("subprocess.run")
def test_login_password_via_stdin_not_args(self, mock_subprocess):
"""Test that password is passed via stdin, never in command args."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider(
registry_username="myuser",
registry_password="s3cret!",
)
provider._docker_login("ghcr.io")
call_args = mock_subprocess.call_args
cmd = call_args.args[0]
assert "s3cret!" not in cmd
assert call_args.kwargs["input"] == "s3cret!"
class TestDockerLogout:
@patch("subprocess.run")
def test_logout_success(self, mock_subprocess):
"""Test successful docker logout."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider()
provider._docker_logout("ghcr.io")
mock_subprocess.assert_called_once_with(
["docker", "logout", "ghcr.io"],
capture_output=True,
text=True,
)
@patch("subprocess.run")
def test_logout_docker_hub(self, mock_subprocess):
"""Test docker logout for Docker Hub (registry=None)."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider()
provider._docker_logout(None)
mock_subprocess.assert_called_once_with(
["docker", "logout"],
capture_output=True,
text=True,
)
@patch("subprocess.run")
def test_logout_failure_warns(self, mock_subprocess):
"""Test docker logout failure only warns, never raises."""
mock_subprocess.return_value = MagicMock(returncode=1, stderr="some error")
provider = _make_provider()
provider._docker_logout("ghcr.io")
@patch("subprocess.run")
def test_logout_exception_warns(self, mock_subprocess):
"""Test docker logout handles exceptions gracefully."""
mock_subprocess.side_effect = OSError("docker crashed")
provider = _make_provider()
provider._docker_logout("ghcr.io")
class TestCleanup:
@patch("subprocess.run")
def test_cleanup_calls_logout_for_each_registry(self, mock_subprocess):
"""Test cleanup calls docker logout for each logged-in registry."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
provider._logged_in_registries = {"ghcr.io", None}
provider.cleanup()
assert mock_subprocess.call_count == 2
assert len(provider._logged_in_registries) == 0
def test_cleanup_idempotent(self):
"""Test cleanup is safe to call multiple times."""
provider = _make_provider()
provider.cleanup()
provider.cleanup()
assert len(provider._logged_in_registries) == 0
@patch("subprocess.run")
def test_cleanup_clears_set(self, mock_subprocess):
"""Test cleanup clears the _logged_in_registries set."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
provider._logged_in_registries = {"ghcr.io"}
provider.cleanup()
assert provider._logged_in_registries == set()
class TestDockerPull:
@patch("subprocess.run")
def test_pull_success(self, mock_subprocess):
"""Test successful docker pull."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
provider._docker_pull("ghcr.io/user/image:tag")
mock_subprocess.assert_called_once_with(
["docker", "pull", "ghcr.io/user/image:tag"],
capture_output=True,
text=True,
)
@patch("subprocess.run")
def test_pull_failure_raises_scan_error(self, mock_subprocess):
"""Test docker pull failure raises ImageScanError."""
mock_subprocess.return_value = MagicMock(
returncode=1, stderr="Error: pull access denied"
)
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with pytest.raises(ImageScanError):
provider._docker_pull("private/image:tag")
@patch("subprocess.run")
def test_pull_docker_not_found(self, mock_subprocess):
"""Test docker binary not found raises ImageDockerNotFoundError."""
mock_subprocess.side_effect = FileNotFoundError("docker not found")
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with pytest.raises(ImageDockerNotFoundError):
provider._docker_pull("alpine:3.18")
class TestDockerLoginIntegration:
@patch("subprocess.run")
def test_run_scan_calls_docker_login_with_credentials(self, mock_subprocess):
"""Test that run_scan() calls docker login, docker pull, then trivy when credentials are set."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image:tag"],
registry_username="myuser",
registry_password="mypass",
)
reports = []
for batch in provider.run_scan():
reports.extend(batch)
calls = mock_subprocess.call_args_list
assert calls[0].args[0][:2] == ["docker", "login"]
assert calls[1].args[0][:2] == ["docker", "pull"]
assert calls[2].args[0][:2] == ["trivy", "image"]
@patch("subprocess.run")
def test_run_scan_no_docker_login_without_credentials(self, mock_subprocess):
"""Test that run_scan() does NOT call docker login when no credentials."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider()
for batch in provider.run_scan():
pass
calls = mock_subprocess.call_args_list
assert all(call.args[0][0] == "trivy" for call in calls)
@patch("subprocess.run")
def test_run_scan_no_docker_login_with_token_only(self, mock_subprocess):
"""Test that run_scan() does NOT call docker login when only token is provided."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(registry_token="my-token")
for batch in provider.run_scan():
pass
calls = mock_subprocess.call_args_list
assert all(call.args[0][0] == "trivy" for call in calls)
@patch("subprocess.run")
def test_run_calls_cleanup_on_success(self, mock_subprocess):
"""Test that run() calls cleanup after successful scan."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image:tag"],
registry_username="myuser",
registry_password="mypass",
)
provider.run()
calls = mock_subprocess.call_args_list
# login, pull, trivy, logout
assert calls[0].args[0][:2] == ["docker", "login"]
assert calls[1].args[0][:2] == ["docker", "pull"]
assert calls[2].args[0][:2] == ["trivy", "image"]
assert calls[-1].args[0][:2] == ["docker", "logout"]
@patch("subprocess.run")
def test_run_calls_cleanup_on_error(self, mock_subprocess):
"""Test that run() calls cleanup even when scan errors."""
def side_effect(*args, **kwargs):
cmd = args[0]
if cmd[0] == "docker" and cmd[1] == "login":
return MagicMock(returncode=0, stderr="")
if cmd[0] == "docker" and cmd[1] == "pull":
return MagicMock(returncode=0, stderr="")
if cmd[0] == "trivy":
return MagicMock(returncode=1, stdout="", stderr="scan failed")
return MagicMock(returncode=0, stderr="")
mock_subprocess.side_effect = side_effect
provider = _make_provider(
images=["ghcr.io/user/image:tag"],
registry_username="myuser",
registry_password="mypass",
)
with pytest.raises(ImageScanError):
provider.run()
last_call = mock_subprocess.call_args_list[-1]
assert last_call.args[0][:2] == ["docker", "logout"]
@patch("subprocess.run")
def test_run_scan_deduplicates_registries(self, mock_subprocess):
"""Test that run_scan() deduplicates registries for docker login but pulls each image."""
mock_subprocess.return_value = MagicMock(
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
)
provider = _make_provider(
images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
registry_username="myuser",
registry_password="mypass",
)
for batch in provider.run_scan():
pass
calls = mock_subprocess.call_args_list
docker_login_calls = [c for c in calls if c.args[0][:2] == ["docker", "login"]]
docker_pull_calls = [c for c in calls if c.args[0][:2] == ["docker", "pull"]]
trivy_calls = [c for c in calls if c.args[0][:2] == ["trivy", "image"]]
assert len(docker_login_calls) == 1
assert len(docker_pull_calls) == 2
assert len(trivy_calls) == 2
@patch("subprocess.run")
def test_test_connection_docker_login_docker_hub(self, mock_subprocess):
"""Test test_connection does docker login and pull for Docker Hub images."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="andoniaf/test-private:tag",
registry_username="myuser",
registry_password="mypass",
)
assert result.is_connected is True
assert mock_subprocess.call_count == 4
login_call = mock_subprocess.call_args_list[0]
assert login_call.args[0] == [
"docker",
"login",
"--username",
"myuser",
"--password-stdin",
]
pull_call = mock_subprocess.call_args_list[1]
assert pull_call.args[0] == [
"docker",
"pull",
"andoniaf/test-private:tag",
]