mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-02-09 02:30:43 +00:00
Compare commits
1 Commits
feat/PROWL
...
feat/PROWL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7adbc2dcf |
@@ -284,6 +284,9 @@ class Provider(ABC):
|
||||
timeout=arguments.timeout,
|
||||
config_path=arguments.config_file,
|
||||
fixer_config=fixer_config,
|
||||
registry_username=arguments.registry_username,
|
||||
registry_password=arguments.registry_password,
|
||||
registry_token=arguments.registry_token,
|
||||
)
|
||||
elif "mongodbatlas" in provider_class_name.lower():
|
||||
provider_class(
|
||||
|
||||
@@ -30,6 +30,14 @@ class ImageBaseException(ProwlerException):
|
||||
"message": "Error scanning container image.",
|
||||
"remediation": "Check the image name and ensure it is accessible.",
|
||||
},
|
||||
(9006, "ImageDockerLoginError"): {
|
||||
"message": "Docker login failed for registry authentication.",
|
||||
"remediation": "Check your registry credentials and ensure the registry is reachable.",
|
||||
},
|
||||
(9007, "ImageDockerNotFoundError"): {
|
||||
"message": "Docker binary not found.",
|
||||
"remediation": "Install Docker to enable private registry authentication via docker login.",
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, code, file=None, original_exception=None, message=None):
|
||||
@@ -97,3 +105,21 @@ class ImageScanError(ImageBaseException):
|
||||
super().__init__(
|
||||
9005, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageDockerLoginError(ImageBaseException):
|
||||
"""Exception raised when docker login fails."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9006, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageDockerNotFoundError(ImageBaseException):
|
||||
"""Exception raised when the docker binary is not found."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9007, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Generator, List
|
||||
@@ -18,6 +19,8 @@ from prowler.lib.utils.utils import print_boxes
|
||||
from prowler.providers.common.models import Audit_Metadata, Connection
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.image.exceptions.exceptions import (
|
||||
ImageDockerLoginError,
|
||||
ImageDockerNotFoundError,
|
||||
ImageFindingProcessingError,
|
||||
ImageListFileNotFoundError,
|
||||
ImageListFileReadError,
|
||||
@@ -49,6 +52,9 @@ class ImageProvider(Provider):
|
||||
config_path: str = None,
|
||||
config_content: dict = None,
|
||||
fixer_config: dict | None = None,
|
||||
registry_username: str = None,
|
||||
registry_password: str = None,
|
||||
registry_token: str = None,
|
||||
):
|
||||
logger.info("Instantiating Image Provider...")
|
||||
|
||||
@@ -62,7 +68,21 @@ class ImageProvider(Provider):
|
||||
self.audited_account = "image-scan"
|
||||
self._session = None
|
||||
self._identity = "prowler"
|
||||
self._auth_method = "No auth"
|
||||
|
||||
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
|
||||
self.registry_username = registry_username or os.environ.get("TRIVY_USERNAME")
|
||||
self.registry_password = registry_password or os.environ.get("TRIVY_PASSWORD")
|
||||
self.registry_token = registry_token or os.environ.get("TRIVY_REGISTRY_TOKEN")
|
||||
self._logged_in_registries: set = set()
|
||||
|
||||
if self.registry_username and self.registry_password:
|
||||
self._auth_method = "Docker login"
|
||||
logger.info("Using docker login for registry authentication")
|
||||
elif self.registry_token:
|
||||
self._auth_method = "Registry token"
|
||||
logger.info("Using registry token for authentication")
|
||||
else:
|
||||
self._auth_method = "No auth"
|
||||
|
||||
# Load images from file if provided
|
||||
if image_list_file:
|
||||
@@ -150,6 +170,100 @@ class ImageProvider(Provider):
|
||||
"""Image provider doesn't need a session since it uses Trivy directly"""
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _extract_registry(image: str) -> str | None:
|
||||
"""Extract registry hostname from an image reference.
|
||||
|
||||
Returns None for Docker Hub images (no registry prefix).
|
||||
"""
|
||||
parts = image.split("/")
|
||||
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
|
||||
return parts[0]
|
||||
return None
|
||||
|
||||
def _docker_login(self, registry: str | None) -> None:
|
||||
"""Run docker login --password-stdin for a registry."""
|
||||
registry_label = registry or "Docker Hub"
|
||||
cmd = [
|
||||
"docker",
|
||||
"login",
|
||||
"--username",
|
||||
self.registry_username,
|
||||
"--password-stdin",
|
||||
]
|
||||
if registry:
|
||||
cmd.append(registry)
|
||||
|
||||
try:
|
||||
process = subprocess.run(
|
||||
cmd,
|
||||
input=self.registry_password,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if process.returncode != 0:
|
||||
raise ImageDockerLoginError(
|
||||
file=__file__,
|
||||
message=f"docker login failed for {registry_label}: {process.stderr.strip()}",
|
||||
)
|
||||
logger.info(f"Docker login successful for {registry_label}")
|
||||
self._logged_in_registries.add(registry)
|
||||
except FileNotFoundError:
|
||||
raise ImageDockerNotFoundError(
|
||||
file=__file__,
|
||||
message="Docker binary not found. Install Docker to use registry authentication.",
|
||||
)
|
||||
|
||||
def _docker_pull(self, image: str) -> None:
|
||||
"""Pull an image using docker pull so Trivy can scan it from the local daemon.
|
||||
|
||||
Trivy's remote source cannot authenticate against Docker Hub (and some
|
||||
other registries) even after docker login. Pulling into the local
|
||||
Docker daemon first lets Trivy find the image via its "docker" source.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Pulling image {image} via docker...")
|
||||
process = subprocess.run(
|
||||
["docker", "pull", image],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if process.returncode != 0:
|
||||
raise ImageScanError(
|
||||
file=__file__,
|
||||
message=f"docker pull failed for {image}: {process.stderr.strip()}",
|
||||
)
|
||||
logger.info(f"Successfully pulled {image}")
|
||||
except FileNotFoundError:
|
||||
raise ImageDockerNotFoundError(
|
||||
file=__file__,
|
||||
message="Docker binary not found. Install Docker to use registry authentication.",
|
||||
)
|
||||
|
||||
def _docker_logout(self, registry: str | None) -> None:
|
||||
"""Run docker logout for a registry. Logs warning on failure, never raises."""
|
||||
registry_label = registry or "Docker Hub"
|
||||
cmd = ["docker", "logout"]
|
||||
if registry:
|
||||
cmd.append(registry)
|
||||
|
||||
try:
|
||||
process = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if process.returncode != 0:
|
||||
logger.warning(
|
||||
f"docker logout failed for {registry_label}: {process.stderr.strip()}"
|
||||
)
|
||||
else:
|
||||
logger.info(f"Docker logout successful for {registry_label}")
|
||||
except Exception as error:
|
||||
logger.warning(f"docker logout failed for {registry_label}: {error}")
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Logout from all registries that were logged in during the scan."""
|
||||
for registry in self._logged_in_registries:
|
||||
self._docker_logout(registry)
|
||||
self._logged_in_registries.clear()
|
||||
|
||||
def _process_finding(
|
||||
self, finding: dict, image_name: str, finding_type: str
|
||||
) -> CheckReportImage:
|
||||
@@ -273,10 +387,13 @@ class ImageProvider(Provider):
|
||||
|
||||
def run(self) -> List[CheckReportImage]:
|
||||
"""Execute the container image scan."""
|
||||
reports = []
|
||||
for batch in self.run_scan():
|
||||
reports.extend(batch)
|
||||
return reports
|
||||
try:
|
||||
reports = []
|
||||
for batch in self.run_scan():
|
||||
reports.extend(batch)
|
||||
return reports
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
def run_scan(self) -> Generator[List[CheckReportImage], None, None]:
|
||||
"""
|
||||
@@ -285,6 +402,13 @@ class ImageProvider(Provider):
|
||||
Yields:
|
||||
List[CheckReportImage]: Batches of findings
|
||||
"""
|
||||
if self.registry_username and self.registry_password:
|
||||
registries = {self._extract_registry(img) for img in self.images}
|
||||
for registry in registries:
|
||||
self._docker_login(registry)
|
||||
for image in self.images:
|
||||
self._docker_pull(image)
|
||||
|
||||
for image in self.images:
|
||||
try:
|
||||
yield from self._scan_single_image(image)
|
||||
@@ -407,8 +531,22 @@ class ImageProvider(Provider):
|
||||
)
|
||||
logger.error(f"Error scanning image {image}: {error}")
|
||||
|
||||
def _build_trivy_env(self) -> dict:
|
||||
"""Build environment variables for Trivy, injecting registry credentials.
|
||||
|
||||
When username+password are provided, we rely on docker login (which stores
|
||||
credentials in Docker's credential store). Setting TRIVY_USERNAME/TRIVY_PASSWORD
|
||||
would cause Trivy to attempt its own token-exchange auth, which fails for
|
||||
Docker Hub and other registries that require docker login.
|
||||
"""
|
||||
env = dict(os.environ)
|
||||
if self.registry_token:
|
||||
env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
|
||||
return env
|
||||
|
||||
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
|
||||
"""Execute Trivy command with optional progress bar."""
|
||||
env = self._build_trivy_env()
|
||||
try:
|
||||
if sys.stdout.isatty():
|
||||
with alive_bar(
|
||||
@@ -423,6 +561,7 @@ class ImageProvider(Provider):
|
||||
command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=env,
|
||||
)
|
||||
bar.title = f"-> Scan completed for {image}"
|
||||
return process
|
||||
@@ -432,12 +571,13 @@ class ImageProvider(Provider):
|
||||
command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=env,
|
||||
)
|
||||
logger.info(f"Scan completed for {image}")
|
||||
return process
|
||||
except (AttributeError, OSError):
|
||||
logger.info(f"Scanning {image}...")
|
||||
return subprocess.run(command, capture_output=True, text=True)
|
||||
return subprocess.run(command, capture_output=True, text=True, env=env)
|
||||
|
||||
def _log_trivy_stderr(self, stderr: str) -> None:
|
||||
"""Parse and log Trivy's stderr output."""
|
||||
@@ -507,6 +647,10 @@ class ImageProvider(Provider):
|
||||
|
||||
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
|
||||
|
||||
report_lines.append(
|
||||
f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
print_boxes(report_lines, report_title)
|
||||
|
||||
@staticmethod
|
||||
@@ -514,6 +658,9 @@ class ImageProvider(Provider):
|
||||
image: str = None,
|
||||
raise_on_exception: bool = True,
|
||||
provider_id: str = None,
|
||||
registry_username: str = None,
|
||||
registry_password: str = None,
|
||||
registry_token: str = None,
|
||||
) -> "Connection":
|
||||
"""
|
||||
Test connection to container registry by attempting to inspect an image.
|
||||
@@ -522,10 +669,15 @@ class ImageProvider(Provider):
|
||||
image: Container image to test
|
||||
raise_on_exception: Whether to raise exceptions
|
||||
provider_id: Fallback for image name
|
||||
registry_username: Registry username for basic auth
|
||||
registry_password: Registry password for basic auth
|
||||
registry_token: Registry token for token-based auth
|
||||
|
||||
Returns:
|
||||
Connection: Connection object with success status
|
||||
"""
|
||||
docker_logged_in = False
|
||||
registry = None
|
||||
try:
|
||||
if provider_id and not image:
|
||||
image = provider_id
|
||||
@@ -533,6 +685,55 @@ class ImageProvider(Provider):
|
||||
if not image:
|
||||
return Connection(is_connected=False, error="Image name is required")
|
||||
|
||||
# Build env with registry credentials
|
||||
env = dict(os.environ)
|
||||
if registry_username and registry_password:
|
||||
# Docker login for reliable private registry auth
|
||||
registry = ImageProvider._extract_registry(image)
|
||||
registry_label = registry or "Docker Hub"
|
||||
login_cmd = [
|
||||
"docker",
|
||||
"login",
|
||||
"--username",
|
||||
registry_username,
|
||||
"--password-stdin",
|
||||
]
|
||||
if registry:
|
||||
login_cmd.append(registry)
|
||||
try:
|
||||
login_result = subprocess.run(
|
||||
login_cmd,
|
||||
input=registry_password,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if login_result.returncode == 0:
|
||||
docker_logged_in = True
|
||||
logger.info(
|
||||
f"Docker login successful for {registry_label} (test_connection)"
|
||||
)
|
||||
# Pull the image so Trivy can scan from the local daemon
|
||||
pull_result = subprocess.run(
|
||||
["docker", "pull", image],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if pull_result.returncode != 0:
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error=f"docker pull failed for {image}: {pull_result.stderr.strip()}",
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Docker login failed for {registry_label}: {login_result.stderr.strip()}"
|
||||
)
|
||||
except FileNotFoundError:
|
||||
logger.warning(
|
||||
"Docker binary not found, skipping docker login for test_connection"
|
||||
)
|
||||
elif registry_token:
|
||||
env["TRIVY_REGISTRY_TOKEN"] = registry_token
|
||||
|
||||
# Test by running trivy with --skip-update to just test image access
|
||||
process = subprocess.run(
|
||||
[
|
||||
@@ -545,6 +746,7 @@ class ImageProvider(Provider):
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
env=env,
|
||||
)
|
||||
|
||||
if process.returncode == 0:
|
||||
@@ -584,3 +786,12 @@ class ImageProvider(Provider):
|
||||
is_connected=False,
|
||||
error=f"Unexpected error: {str(error)}",
|
||||
)
|
||||
finally:
|
||||
if docker_logged_in:
|
||||
logout_cmd = ["docker", "logout"]
|
||||
if registry:
|
||||
logout_cmd.append(registry)
|
||||
try:
|
||||
subprocess.run(logout_cmd, capture_output=True, text=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock, patch
|
||||
@@ -6,6 +7,8 @@ import pytest
|
||||
|
||||
from prowler.lib.check.models import CheckReportImage
|
||||
from prowler.providers.image.exceptions.exceptions import (
|
||||
ImageDockerLoginError,
|
||||
ImageDockerNotFoundError,
|
||||
ImageListFileNotFoundError,
|
||||
ImageNoImagesProvidedError,
|
||||
ImageScanError,
|
||||
@@ -395,3 +398,591 @@ class TestImageProvider:
|
||||
with pytest.raises(ImageScanError):
|
||||
for _ in provider.run_scan():
|
||||
pass
|
||||
|
||||
|
||||
class TestImageProviderRegistryAuth:
|
||||
def test_no_auth_by_default(self):
|
||||
"""Test that no auth is set when no credentials are provided."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider.registry_username is None
|
||||
assert provider.registry_password is None
|
||||
assert provider.registry_token is None
|
||||
assert provider.auth_method == "No auth"
|
||||
|
||||
def test_basic_auth_with_explicit_params(self):
|
||||
"""Test basic auth via explicit constructor params."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert provider.registry_username == "myuser"
|
||||
assert provider.registry_password == "mypass"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
def test_token_auth_with_explicit_param(self):
|
||||
"""Test token auth via explicit constructor param."""
|
||||
provider = _make_provider(registry_token="my-token-123")
|
||||
|
||||
assert provider.registry_token == "my-token-123"
|
||||
assert provider.auth_method == "Registry token"
|
||||
|
||||
def test_basic_auth_takes_precedence_over_token(self):
|
||||
"""Test that username/password takes precedence over token."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
registry_token="my-token",
|
||||
)
|
||||
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
|
||||
def test_basic_auth_from_env_vars(self):
|
||||
"""Test that env vars are used as fallback for basic auth."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider.registry_username == "envuser"
|
||||
assert provider.registry_password == "envpass"
|
||||
assert provider.auth_method == "Docker login"
|
||||
|
||||
@patch.dict(os.environ, {"TRIVY_REGISTRY_TOKEN": "env-token"})
|
||||
def test_token_auth_from_env_var(self):
|
||||
"""Test that env var is used as fallback for token auth."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider.registry_token == "env-token"
|
||||
assert provider.auth_method == "Registry token"
|
||||
|
||||
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
|
||||
def test_explicit_params_override_env_vars(self):
|
||||
"""Test that explicit params take precedence over env vars."""
|
||||
provider = _make_provider(
|
||||
registry_username="explicit",
|
||||
registry_password="explicit-pass",
|
||||
)
|
||||
|
||||
assert provider.registry_username == "explicit"
|
||||
assert provider.registry_password == "explicit-pass"
|
||||
|
||||
def test_build_trivy_env_no_auth(self):
|
||||
"""Test that _build_trivy_env returns base env when no auth."""
|
||||
provider = _make_provider()
|
||||
env = provider._build_trivy_env()
|
||||
|
||||
assert "TRIVY_USERNAME" not in env
|
||||
assert "TRIVY_PASSWORD" not in env
|
||||
assert "TRIVY_REGISTRY_TOKEN" not in env
|
||||
|
||||
def test_build_trivy_env_basic_auth_no_env_vars(self):
|
||||
"""Test that _build_trivy_env does NOT inject TRIVY_USERNAME/PASSWORD (docker login handles it)."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
env = provider._build_trivy_env()
|
||||
|
||||
assert "TRIVY_USERNAME" not in env
|
||||
assert "TRIVY_PASSWORD" not in env
|
||||
|
||||
def test_build_trivy_env_token_auth(self):
|
||||
"""Test that _build_trivy_env injects registry token."""
|
||||
provider = _make_provider(registry_token="my-token")
|
||||
env = provider._build_trivy_env()
|
||||
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_execute_trivy_no_trivy_env_with_basic_auth(self, mock_subprocess):
|
||||
"""Test that _execute_trivy does NOT set TRIVY_USERNAME/PASSWORD (docker login handles auth)."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
|
||||
|
||||
call_kwargs = mock_subprocess.call_args
|
||||
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
|
||||
assert "TRIVY_USERNAME" not in env
|
||||
assert "TRIVY_PASSWORD" not in env
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_with_basic_auth(self, mock_subprocess):
|
||||
"""Test test_connection does docker login, pull, trivy, logout and does NOT set TRIVY_* env vars."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="private.registry.io/myapp:v1",
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
# Should have 4 subprocess calls: docker login, docker pull, trivy, docker logout
|
||||
assert mock_subprocess.call_count == 4
|
||||
login_call = mock_subprocess.call_args_list[0]
|
||||
assert login_call.args[0] == [
|
||||
"docker",
|
||||
"login",
|
||||
"--username",
|
||||
"myuser",
|
||||
"--password-stdin",
|
||||
"private.registry.io",
|
||||
]
|
||||
assert login_call.kwargs["input"] == "mypass"
|
||||
|
||||
pull_call = mock_subprocess.call_args_list[1]
|
||||
assert pull_call.args[0] == ["docker", "pull", "private.registry.io/myapp:v1"]
|
||||
|
||||
trivy_call = mock_subprocess.call_args_list[2]
|
||||
env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
|
||||
assert "TRIVY_USERNAME" not in env
|
||||
assert "TRIVY_PASSWORD" not in env
|
||||
|
||||
logout_call = mock_subprocess.call_args_list[3]
|
||||
assert logout_call.args[0] == ["docker", "logout", "private.registry.io"]
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_with_token(self, mock_subprocess):
|
||||
"""Test test_connection passes token via env."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="private.registry.io/myapp:v1",
|
||||
registry_token="my-token",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
call_kwargs = mock_subprocess.call_args
|
||||
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
def test_print_credentials_shows_auth_method(self):
|
||||
"""Test that print_credentials outputs the auth method."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
with mock.patch("builtins.print") as mock_print:
|
||||
provider.print_credentials()
|
||||
output = " ".join(
|
||||
str(call.args[0]) for call in mock_print.call_args_list if call.args
|
||||
)
|
||||
assert "Docker login" in output
|
||||
|
||||
|
||||
class TestExtractRegistry:
|
||||
def test_docker_hub_simple(self):
|
||||
assert ImageProvider._extract_registry("alpine:3.18") is None
|
||||
|
||||
def test_docker_hub_with_namespace(self):
|
||||
assert ImageProvider._extract_registry("andoniaf/test-private:tag") is None
|
||||
|
||||
def test_ghcr(self):
|
||||
assert ImageProvider._extract_registry("ghcr.io/user/image:tag") == "ghcr.io"
|
||||
|
||||
def test_ecr(self):
|
||||
assert (
|
||||
ImageProvider._extract_registry(
|
||||
"123456789012.dkr.ecr.us-east-1.amazonaws.com/repo:tag"
|
||||
)
|
||||
== "123456789012.dkr.ecr.us-east-1.amazonaws.com"
|
||||
)
|
||||
|
||||
def test_localhost_with_port(self):
|
||||
assert (
|
||||
ImageProvider._extract_registry("localhost:5000/myimage:latest")
|
||||
== "localhost:5000"
|
||||
)
|
||||
|
||||
def test_custom_registry_with_port(self):
|
||||
assert (
|
||||
ImageProvider._extract_registry("myregistry.io:5000/image:tag")
|
||||
== "myregistry.io:5000"
|
||||
)
|
||||
|
||||
def test_digest_reference(self):
|
||||
assert (
|
||||
ImageProvider._extract_registry("ghcr.io/user/image@sha256:abc123")
|
||||
== "ghcr.io"
|
||||
)
|
||||
|
||||
def test_bare_image_name(self):
|
||||
assert ImageProvider._extract_registry("nginx") is None
|
||||
|
||||
|
||||
class TestDockerLogin:
|
||||
@patch("subprocess.run")
|
||||
def test_login_success(self, mock_subprocess):
|
||||
"""Test successful docker login."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
provider._docker_login("ghcr.io")
|
||||
|
||||
mock_subprocess.assert_called_once()
|
||||
call_args = mock_subprocess.call_args
|
||||
assert call_args.args[0] == [
|
||||
"docker",
|
||||
"login",
|
||||
"--username",
|
||||
"myuser",
|
||||
"--password-stdin",
|
||||
"ghcr.io",
|
||||
]
|
||||
assert call_args.kwargs["input"] == "mypass"
|
||||
assert "ghcr.io" in provider._logged_in_registries
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_login_docker_hub(self, mock_subprocess):
|
||||
"""Test docker login for Docker Hub (registry=None)."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
provider._docker_login(None)
|
||||
|
||||
call_args = mock_subprocess.call_args
|
||||
assert call_args.args[0] == [
|
||||
"docker",
|
||||
"login",
|
||||
"--username",
|
||||
"myuser",
|
||||
"--password-stdin",
|
||||
]
|
||||
assert None in provider._logged_in_registries
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_login_failure_raises(self, mock_subprocess):
|
||||
"""Test docker login failure raises ImageDockerLoginError."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1, stderr="unauthorized: incorrect username or password"
|
||||
)
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="badpass",
|
||||
)
|
||||
|
||||
with pytest.raises(ImageDockerLoginError):
|
||||
provider._docker_login("ghcr.io")
|
||||
|
||||
assert "ghcr.io" not in provider._logged_in_registries
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_login_docker_not_found(self, mock_subprocess):
|
||||
"""Test docker binary not found raises ImageDockerNotFoundError."""
|
||||
mock_subprocess.side_effect = FileNotFoundError("docker not found")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
with pytest.raises(ImageDockerNotFoundError):
|
||||
provider._docker_login("ghcr.io")
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_login_password_via_stdin_not_args(self, mock_subprocess):
|
||||
"""Test that password is passed via stdin, never in command args."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="s3cret!",
|
||||
)
|
||||
|
||||
provider._docker_login("ghcr.io")
|
||||
|
||||
call_args = mock_subprocess.call_args
|
||||
cmd = call_args.args[0]
|
||||
assert "s3cret!" not in cmd
|
||||
assert call_args.kwargs["input"] == "s3cret!"
|
||||
|
||||
|
||||
class TestDockerLogout:
|
||||
@patch("subprocess.run")
|
||||
def test_logout_success(self, mock_subprocess):
|
||||
"""Test successful docker logout."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider()
|
||||
|
||||
provider._docker_logout("ghcr.io")
|
||||
|
||||
mock_subprocess.assert_called_once_with(
|
||||
["docker", "logout", "ghcr.io"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_logout_docker_hub(self, mock_subprocess):
|
||||
"""Test docker logout for Docker Hub (registry=None)."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider()
|
||||
|
||||
provider._docker_logout(None)
|
||||
|
||||
mock_subprocess.assert_called_once_with(
|
||||
["docker", "logout"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_logout_failure_warns(self, mock_subprocess):
|
||||
"""Test docker logout failure only warns, never raises."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=1, stderr="some error")
|
||||
provider = _make_provider()
|
||||
|
||||
provider._docker_logout("ghcr.io")
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_logout_exception_warns(self, mock_subprocess):
|
||||
"""Test docker logout handles exceptions gracefully."""
|
||||
mock_subprocess.side_effect = OSError("docker crashed")
|
||||
provider = _make_provider()
|
||||
|
||||
provider._docker_logout("ghcr.io")
|
||||
|
||||
|
||||
class TestCleanup:
|
||||
@patch("subprocess.run")
|
||||
def test_cleanup_calls_logout_for_each_registry(self, mock_subprocess):
|
||||
"""Test cleanup calls docker logout for each logged-in registry."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
provider._logged_in_registries = {"ghcr.io", None}
|
||||
|
||||
provider.cleanup()
|
||||
|
||||
assert mock_subprocess.call_count == 2
|
||||
assert len(provider._logged_in_registries) == 0
|
||||
|
||||
def test_cleanup_idempotent(self):
|
||||
"""Test cleanup is safe to call multiple times."""
|
||||
provider = _make_provider()
|
||||
|
||||
provider.cleanup()
|
||||
provider.cleanup()
|
||||
|
||||
assert len(provider._logged_in_registries) == 0
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_cleanup_clears_set(self, mock_subprocess):
|
||||
"""Test cleanup clears the _logged_in_registries set."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
provider._logged_in_registries = {"ghcr.io"}
|
||||
|
||||
provider.cleanup()
|
||||
|
||||
assert provider._logged_in_registries == set()
|
||||
|
||||
|
||||
class TestDockerPull:
|
||||
@patch("subprocess.run")
|
||||
def test_pull_success(self, mock_subprocess):
|
||||
"""Test successful docker pull."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
provider._docker_pull("ghcr.io/user/image:tag")
|
||||
|
||||
mock_subprocess.assert_called_once_with(
|
||||
["docker", "pull", "ghcr.io/user/image:tag"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_pull_failure_raises_scan_error(self, mock_subprocess):
|
||||
"""Test docker pull failure raises ImageScanError."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1, stderr="Error: pull access denied"
|
||||
)
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
with pytest.raises(ImageScanError):
|
||||
provider._docker_pull("private/image:tag")
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_pull_docker_not_found(self, mock_subprocess):
|
||||
"""Test docker binary not found raises ImageDockerNotFoundError."""
|
||||
mock_subprocess.side_effect = FileNotFoundError("docker not found")
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
with pytest.raises(ImageDockerNotFoundError):
|
||||
provider._docker_pull("alpine:3.18")
|
||||
|
||||
|
||||
class TestDockerLoginIntegration:
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_calls_docker_login_with_credentials(self, mock_subprocess):
|
||||
"""Test that run_scan() calls docker login, docker pull, then trivy when credentials are set."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert calls[0].args[0][:2] == ["docker", "login"]
|
||||
assert calls[1].args[0][:2] == ["docker", "pull"]
|
||||
assert calls[2].args[0][:2] == ["trivy", "image"]
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_no_docker_login_without_credentials(self, mock_subprocess):
|
||||
"""Test that run_scan() does NOT call docker login when no credentials."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider()
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_no_docker_login_with_token_only(self, mock_subprocess):
|
||||
"""Test that run_scan() does NOT call docker login when only token is provided."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(registry_token="my-token")
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
assert all(call.args[0][0] == "trivy" for call in calls)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_calls_cleanup_on_success(self, mock_subprocess):
|
||||
"""Test that run() calls cleanup after successful scan."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
provider.run()
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
# login, pull, trivy, logout
|
||||
assert calls[0].args[0][:2] == ["docker", "login"]
|
||||
assert calls[1].args[0][:2] == ["docker", "pull"]
|
||||
assert calls[2].args[0][:2] == ["trivy", "image"]
|
||||
assert calls[-1].args[0][:2] == ["docker", "logout"]
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_calls_cleanup_on_error(self, mock_subprocess):
|
||||
"""Test that run() calls cleanup even when scan errors."""
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
cmd = args[0]
|
||||
if cmd[0] == "docker" and cmd[1] == "login":
|
||||
return MagicMock(returncode=0, stderr="")
|
||||
if cmd[0] == "docker" and cmd[1] == "pull":
|
||||
return MagicMock(returncode=0, stderr="")
|
||||
if cmd[0] == "trivy":
|
||||
return MagicMock(returncode=1, stdout="", stderr="scan failed")
|
||||
return MagicMock(returncode=0, stderr="")
|
||||
|
||||
mock_subprocess.side_effect = side_effect
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
with pytest.raises(ImageScanError):
|
||||
provider.run()
|
||||
|
||||
last_call = mock_subprocess.call_args_list[-1]
|
||||
assert last_call.args[0][:2] == ["docker", "logout"]
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_deduplicates_registries(self, mock_subprocess):
|
||||
"""Test that run_scan() deduplicates registries for docker login but pulls each image."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
provider = _make_provider(
|
||||
images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
for batch in provider.run_scan():
|
||||
pass
|
||||
|
||||
calls = mock_subprocess.call_args_list
|
||||
docker_login_calls = [c for c in calls if c.args[0][:2] == ["docker", "login"]]
|
||||
docker_pull_calls = [c for c in calls if c.args[0][:2] == ["docker", "pull"]]
|
||||
trivy_calls = [c for c in calls if c.args[0][:2] == ["trivy", "image"]]
|
||||
assert len(docker_login_calls) == 1
|
||||
assert len(docker_pull_calls) == 2
|
||||
assert len(trivy_calls) == 2
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_docker_login_docker_hub(self, mock_subprocess):
|
||||
"""Test test_connection does docker login and pull for Docker Hub images."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="andoniaf/test-private:tag",
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
assert mock_subprocess.call_count == 4
|
||||
login_call = mock_subprocess.call_args_list[0]
|
||||
assert login_call.args[0] == [
|
||||
"docker",
|
||||
"login",
|
||||
"--username",
|
||||
"myuser",
|
||||
"--password-stdin",
|
||||
]
|
||||
pull_call = mock_subprocess.call_args_list[1]
|
||||
assert pull_call.args[0] == [
|
||||
"docker",
|
||||
"pull",
|
||||
"andoniaf/test-private:tag",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user