Compare commits

...

4 Commits

Author SHA1 Message Date
Andoni A.
b48b381624 feat(image): add private registry authentication support
Add registry_username, registry_password, and registry_token params
to ImageProvider following the IaC provider pattern (explicit params
with env var fallback). Credentials are injected as environment
variables into Trivy subprocess calls.
2026-02-06 08:18:05 +01:00
Andoni A.
9d5e3f4758 fix(image): replace sys.exit calls with exceptions, fix mutable defaults, add tests
- Create exceptions module (codes 9000-9005) following OCI provider pattern
- Replace all sys.exit(1) calls with typed exceptions
- Fix mutable default arguments in ImageProvider and CheckReportImage
- Add return type hints to all properties and methods
- Add ResourceGroup field to metadata dict
- Add test suite with 23 test cases covering initialization, finding
  processing, scan execution, error handling, and connection testing
- Update CHANGELOG with container image provider entry
2026-02-05 20:37:29 +01:00
Andoni A.
beb74a6459 Merge remote-tracking branch 'origin/master' into image-scan-poc 2026-02-05 19:20:34 +01:00
Andoni A.
f42de0d21b feat(image): add container image provider POC
Add initial proof of concept for a container image security scanning
provider that uses Trivy for vulnerability detection in container images.
2026-01-29 08:41:11 +01:00
20 changed files with 1787 additions and 11 deletions

View File

@@ -20,6 +20,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- CIS 1.12 compliance framework for Kubernetes [(#9778)](https://github.com/prowler-cloud/prowler/pull/9778)
- CIS 6.0 for M365 provider [(#9779)](https://github.com/prowler-cloud/prowler/pull/9779)
- CIS 5.0 compliance framework for the Azure provider [(#9777)](https://github.com/prowler-cloud/prowler/pull/9777)
- Container Image provider (POC) using Trivy for vulnerability and secret scanning
### Changed
- Update AWS Step Functions service metadata to new format [(#9432)](https://github.com/prowler-cloud/prowler/pull/9432)

View File

@@ -118,6 +118,7 @@ from prowler.providers.common.quick_inventory import run_provider_quick_inventor
from prowler.providers.gcp.models import GCPOutputOptions
from prowler.providers.github.models import GithubOutputOptions
from prowler.providers.iac.models import IACOutputOptions
from prowler.providers.image.models import ImageOutputOptions
from prowler.providers.kubernetes.models import KubernetesOutputOptions
from prowler.providers.llm.models import LLMOutputOptions
from prowler.providers.m365.models import M365OutputOptions
@@ -204,8 +205,8 @@ def prowler():
# Load compliance frameworks
logger.debug("Loading compliance frameworks from .json files")
# Skip compliance frameworks for IAC and LLM providers
if provider != "iac" and provider != "llm":
# Skip compliance frameworks for IAC, LLM, and Image providers
if provider not in ("iac", "llm", "image"):
bulk_compliance_frameworks = Compliance.get_bulk(provider)
# Complete checks metadata with the compliance framework specification
bulk_checks_metadata = update_checks_metadata_with_compliance(
@@ -262,8 +263,8 @@ def prowler():
if not args.only_logs:
global_provider.print_credentials()
# Skip service and check loading for IAC and LLM providers
if provider != "iac" and provider != "llm":
# Skip service and check loading for IAC, LLM, and Image providers
if provider not in ("iac", "llm", "image"):
# Import custom checks from folder
if checks_folder:
custom_checks = parse_checks_from_folder(global_provider, checks_folder)
@@ -346,6 +347,8 @@ def prowler():
)
elif provider == "iac":
output_options = IACOutputOptions(args, bulk_checks_metadata)
elif provider == "image":
output_options = ImageOutputOptions(args, bulk_checks_metadata)
elif provider == "llm":
output_options = LLMOutputOptions(args, bulk_checks_metadata)
elif provider == "oraclecloud":
@@ -365,8 +368,8 @@ def prowler():
# Execute checks
findings = []
if provider == "iac" or provider == "llm":
# For IAC and LLM providers, run the scan directly
if provider in ("iac", "llm", "image"):
# For IAC, LLM, and Image providers, run the scan directly
if provider == "llm":
def streaming_callback(findings_batch):

View File

@@ -73,7 +73,10 @@ def get_available_compliance_frameworks(provider=None):
if provider:
providers = [provider]
for provider in providers:
with os.scandir(f"{actual_directory}/../compliance/{provider}") as files:
compliance_dir = f"{actual_directory}/../compliance/{provider}"
if not os.path.isdir(compliance_dir):
continue
with os.scandir(compliance_dir) as files:
for file in files:
if file.is_file() and file.name.endswith(".json"):
available_compliance_frameworks.append(

View File

@@ -163,6 +163,7 @@ class CheckMetadata(BaseModel):
check_id
and values.get("Provider") != "iac"
and values.get("Provider") != "llm"
and values.get("Provider") != "image"
):
service_from_check_id = check_id.split("_")[0]
if service_name != service_from_check_id:
@@ -183,6 +184,7 @@ class CheckMetadata(BaseModel):
check_id
and values.get("Provider") != "iac"
and values.get("Provider") != "llm"
and values.get("Provider") != "image"
):
if "-" in check_id:
raise ValueError(
@@ -791,6 +793,44 @@ class CheckReportIAC(Check_Report):
)
@dataclass
class CheckReportImage(Check_Report):
"""Contains the Container Image Check's finding information using Trivy."""
resource_name: str
image_digest: str
package_name: str
installed_version: str
fixed_version: str
def __init__(
self,
metadata: Optional[dict] = None,
finding: Optional[dict] = None,
image_name: str = "",
) -> None:
"""
Initialize the Container Image Check's finding information from a Trivy vulnerability/secret dict.
Args:
metadata (Dict): Check metadata.
finding (dict): A single vulnerability/secret result from Trivy's JSON output.
image_name (str): The container image name being scanned.
"""
if metadata is None:
metadata = {}
if finding is None:
finding = {}
super().__init__(metadata, finding)
self.resource = finding
self.resource_name = image_name
self.image_digest = finding.get("PkgID", "")
self.package_name = finding.get("PkgName", "")
self.installed_version = finding.get("InstalledVersion", "")
self.fixed_version = finding.get("FixedVersion", "")
@dataclass
class CheckReportLLM(Check_Report):
"""Contains the LLM Check's finding information."""

View File

@@ -14,8 +14,8 @@ def recover_checks_from_provider(
Returns a list of tuples with the following format (check_name, check_path)
"""
try:
# Bypass check loading for IAC provider since it uses Trivy directly
if provider == "iac" or provider == "llm":
# Bypass check loading for IAC, LLM, and Image providers since they use external tools directly
if provider in ("iac", "llm", "image"):
return []
checks = []

View File

@@ -27,10 +27,10 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,dashboard,iac} ...",
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,dashboard,iac,image} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,iac,llm,nhn,mongodbatlas,oraclecloud,alibabacloud}
{aws,azure,gcp,kubernetes,m365,github,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -41,6 +41,7 @@ Available Cloud Providers:
alibabacloud Alibaba Cloud Provider
iac IaC Provider (Beta)
llm LLM Provider (Beta)
image Container Image Provider (PoC)
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider (Beta)

View File

@@ -358,6 +358,23 @@ class Finding(BaseModel):
)
output_data["region"] = check_output.region
elif provider.type == "image":
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = "image"
output_data["account_name"] = "image"
output_data["resource_name"] = getattr(
check_output, "resource_name", ""
)
output_data["resource_uid"] = getattr(check_output, "resource_name", "")
output_data["region"] = getattr(check_output, "region", "container")
output_data["package_name"] = getattr(check_output, "package_name", "")
output_data["installed_version"] = getattr(
check_output, "installed_version", ""
)
output_data["fixed_version"] = getattr(
check_output, "fixed_version", ""
)
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name

View File

@@ -77,6 +77,12 @@ def display_summary_table(
elif provider.type == "alibabacloud":
entity_type = "Account"
audited_entities = provider.identity.account_id
elif provider.type == "image":
entity_type = "Image"
if len(provider.images) == 1:
audited_entities = provider.images[0]
else:
audited_entities = f"{len(provider.images)} images"
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):

View File

@@ -266,6 +266,17 @@ class Provider(ABC):
config_path=arguments.config_file,
fixer_config=fixer_config,
)
elif "image" in provider_class_name.lower():
provider_class(
images=arguments.images,
image_list_file=arguments.image_list_file,
scanners=arguments.scanners,
trivy_severity=arguments.trivy_severity,
ignore_unfixed=arguments.ignore_unfixed,
timeout=arguments.timeout,
config_path=arguments.config_file,
fixer_config=fixer_config,
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(
atlas_public_key=arguments.atlas_public_key,

View File

@@ -0,0 +1,236 @@
# Container Image Provider (PoC)
This is a proof of concept implementation of a container image scanning provider for Prowler using Trivy.
## Overview
The Image Provider follows the Tool/Wrapper pattern established by the IaC provider. It delegates all scanning logic to Trivy's `trivy image` command and converts the output to Prowler's finding format.
## Prerequisites
### Trivy Installation
Trivy must be installed and available in your PATH. Install using one of these methods:
**macOS (Homebrew):**
```bash
brew install trivy
```
**Linux (apt):**
```bash
sudo apt-get install trivy
```
**Linux (rpm):**
```bash
sudo yum install trivy
```
**Docker:**
```bash
docker pull aquasecurity/trivy
```
For more installation options, see the [Trivy documentation](https://trivy.dev/latest/getting-started/installation/).
## Usage
### Basic Scan
Scan a single container image:
```bash
poetry run python prowler-cli.py image --image nginx:latest
```
### Multiple Images
Scan multiple images in a single run:
```bash
poetry run python prowler-cli.py image --image nginx:latest --image alpine:3.18 --image python:3.11
```
### From File
Scan images listed in a file (one per line):
```bash
# images.txt
nginx:latest
alpine:3.18
python:3.11
# This line is a comment and will be ignored
poetry run python prowler-cli.py image --image-list images.txt
```
### Scanner Selection
By default, the provider uses vulnerability and secret scanners. Customize with:
```bash
# Vulnerability scanning only
poetry run python prowler-cli.py image --image nginx:latest --scanners vuln
# All scanners
poetry run python prowler-cli.py image --image nginx:latest --scanners vuln secret misconfig license
```
### Severity Filtering
Filter findings by severity:
```bash
# Critical and high only
poetry run python prowler-cli.py image --image nginx:latest --trivy-severity CRITICAL HIGH
```
### Ignore Unfixed Vulnerabilities
Skip vulnerabilities without available fixes:
```bash
poetry run python prowler-cli.py image --image nginx:latest --ignore-unfixed
```
### Custom Timeout
Adjust Trivy scan timeout (default: 5m):
```bash
poetry run python prowler-cli.py image --image large-image:latest --timeout 10m
```
### Output Formats
Export results in different formats:
```bash
# JSON and CSV (default includes html)
poetry run python prowler-cli.py image --image nginx:latest --output-formats json-ocsf csv
# Specify output directory
poetry run python prowler-cli.py image --image nginx:latest --output-directory ./scan-results
```
## CLI Reference
```
prowler image [OPTIONS]
Options:
--image, -I Container image to scan (can be specified multiple times)
--image-list File containing list of images to scan (one per line)
--scanners Trivy scanners: vuln, secret, misconfig, license
(default: vuln, secret)
--trivy-severity Filter: CRITICAL, HIGH, MEDIUM, LOW, UNKNOWN
--ignore-unfixed Ignore vulnerabilities without fixes
--timeout Trivy scan timeout (default: 5m)
Standard Prowler Options:
--output-formats, -M Output formats (csv, json-ocsf, html)
--output-directory, -o Output directory
--output-filename, -F Custom output filename
--verbose Show all findings during execution
--no-banner, -b Hide Prowler banner
```
## Architecture
```
prowler/providers/image/
├── __init__.py
├── image_provider.py # Main provider class
├── models.py # ImageOutputOptions
├── README.md # This file
└── lib/
└── arguments/
├── __init__.py
└── arguments.py # CLI argument definitions
```
### Key Components
1. **ImageProvider** (`image_provider.py`):
- Builds and executes `trivy image` commands
- Parses JSON output from Trivy
- Converts findings to `CheckReportImage` format
- Supports scanning multiple images in sequence
2. **CheckReportImage** (`prowler/lib/check/models.py`):
- Extends `Check_Report` base class
- Stores vulnerability-specific fields (package name, versions)
3. **ImageOutputOptions** (`models.py`):
- Customizes output filename generation
4. **CLI Arguments** (`lib/arguments/arguments.py`):
- Defines image provider CLI arguments
- Validates required arguments
## Known Limitations (PoC Scope)
1. **Public Registries Only**: No authentication for private registries
2. **No Local Tar Support**: Cannot scan local image tar files
3. **No SBOM Export**: Does not generate SBOM output
4. **No Compliance Mapping**: No compliance framework integration
5. **Sequential Scanning**: Images scanned one at a time (no parallelization)
## Future Work
For full implementation, consider:
1. **Registry Authentication**:
- Docker config.json support
- Environment variable credentials
- Cloud provider registry integration (ECR, GCR, ACR)
2. **Local Image Support**:
- Scan from tar files (`--input` flag)
- Scan from Docker daemon
3. **SBOM Generation**:
- CycloneDX output
- SPDX output
4. **Performance**:
- Parallel image scanning
- Caching of vulnerability databases
5. **Compliance Integration**:
- Map CVEs to compliance frameworks
- Custom compliance definitions
6. **Enhanced Reporting**:
- Image-specific HTML reports
- Vulnerability trending
## Trivy Output Format
Trivy's JSON output structure for image scanning:
```json
{
"Results": [
{
"Target": "nginx:latest (debian 11.7)",
"Type": "debian",
"Vulnerabilities": [
{
"VulnerabilityID": "CVE-2023-1234",
"PkgName": "openssl",
"InstalledVersion": "1.1.1n-0+deb11u4",
"FixedVersion": "1.1.1n-0+deb11u5",
"Severity": "HIGH",
"Title": "Buffer overflow in...",
"Description": "...",
"PrimaryURL": "https://avd.aquasec.com/nvd/cve-2023-1234"
}
],
"Secrets": [...],
"Misconfigurations": [...]
}
]
}
```
## References
- [Trivy Documentation](https://trivy.dev/docs/latest/)
- [Trivy Image Scanning](https://trivy.dev/docs/latest/guide/target/container_image/)
- [Trivy JSON Output](https://trivy.dev/docs/latest/guide/configuration/reporting/)
- [Prowler IaC Provider](../iac/) - Reference implementation

View File

View File

@@ -0,0 +1,19 @@
from prowler.providers.image.exceptions.exceptions import (
ImageBaseException,
ImageFindingProcessingError,
ImageListFileNotFoundError,
ImageListFileReadError,
ImageNoImagesProvidedError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
__all__ = [
"ImageBaseException",
"ImageFindingProcessingError",
"ImageListFileNotFoundError",
"ImageListFileReadError",
"ImageNoImagesProvidedError",
"ImageScanError",
"ImageTrivyBinaryNotFoundError",
]

View File

@@ -0,0 +1,99 @@
from prowler.exceptions.exceptions import ProwlerException
# Exceptions codes from 9000 to 9999 are reserved for Image exceptions
class ImageBaseException(ProwlerException):
"""Base class for Image provider errors."""
IMAGE_ERROR_CODES = {
(9000, "ImageNoImagesProvidedError"): {
"message": "No container images provided for scanning.",
"remediation": "Provide at least one image using --image or --image-list-file.",
},
(9001, "ImageListFileNotFoundError"): {
"message": "Image list file not found.",
"remediation": "Ensure the image list file exists at the specified path.",
},
(9002, "ImageListFileReadError"): {
"message": "Error reading image list file.",
"remediation": "Check file permissions and format. The file should contain one image per line.",
},
(9003, "ImageFindingProcessingError"): {
"message": "Error processing image scan finding.",
"remediation": "Check the Trivy output format and ensure the finding structure is valid.",
},
(9004, "ImageTrivyBinaryNotFoundError"): {
"message": "Trivy binary not found.",
"remediation": "Install Trivy from https://trivy.dev/latest/getting-started/installation/",
},
(9005, "ImageScanError"): {
"message": "Error scanning container image.",
"remediation": "Check the image name and ensure it is accessible.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
error_info = self.IMAGE_ERROR_CODES.get((code, self.__class__.__name__))
if message:
error_info["message"] = message
super().__init__(
code,
source="Image",
file=file,
original_exception=original_exception,
error_info=error_info,
)
class ImageNoImagesProvidedError(ImageBaseException):
"""Exception raised when no container images are provided for scanning."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9000, file=file, original_exception=original_exception, message=message
)
class ImageListFileNotFoundError(ImageBaseException):
"""Exception raised when the image list file is not found."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9001, file=file, original_exception=original_exception, message=message
)
class ImageListFileReadError(ImageBaseException):
"""Exception raised when the image list file cannot be read."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9002, file=file, original_exception=original_exception, message=message
)
class ImageFindingProcessingError(ImageBaseException):
"""Exception raised when a finding cannot be processed."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9003, file=file, original_exception=original_exception, message=message
)
class ImageTrivyBinaryNotFoundError(ImageBaseException):
"""Exception raised when the Trivy binary is not found."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9004, file=file, original_exception=original_exception, message=message
)
class ImageScanError(ImageBaseException):
"""Exception raised when a general scan error occurs."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
9005, file=file, original_exception=original_exception, message=message
)

View File

@@ -0,0 +1,602 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
from typing import Generator, List
from alive_progress import alive_bar
from colorama import Fore, Style
from prowler.config.config import (
default_config_file_path,
load_and_validate_config_file,
)
from prowler.lib.check.models import CheckReportImage
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageFindingProcessingError,
ImageListFileNotFoundError,
ImageListFileReadError,
ImageNoImagesProvidedError,
ImageTrivyBinaryNotFoundError,
)
class ImageProvider(Provider):
"""
Container Image Provider using Trivy for vulnerability and secret scanning.
This is a Tool/Wrapper provider that delegates all scanning logic to Trivy's
`trivy image` command and converts the output to Prowler's finding format.
"""
_type: str = "image"
audit_metadata: Audit_Metadata
def __init__(
self,
images: list[str] | None = None,
image_list_file: str = None,
scanners: list[str] | None = None,
trivy_severity: list[str] | None = None,
ignore_unfixed: bool = False,
timeout: str = "5m",
config_path: str = None,
config_content: dict = None,
fixer_config: dict | None = None,
registry_username: str = None,
registry_password: str = None,
registry_token: str = None,
):
logger.info("Instantiating Image Provider...")
self.images = images if images is not None else []
self.image_list_file = image_list_file
self.scanners = scanners if scanners is not None else ["vuln", "secret"]
self.trivy_severity = trivy_severity if trivy_severity is not None else []
self.ignore_unfixed = ignore_unfixed
self.timeout = timeout
self.region = "container"
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
self.registry_username = registry_username or os.environ.get("TRIVY_USERNAME")
self.registry_password = registry_password or os.environ.get("TRIVY_PASSWORD")
self.registry_token = registry_token or os.environ.get("TRIVY_REGISTRY_TOKEN")
if self.registry_username and self.registry_password:
self._auth_method = "Basic auth"
logger.info("Using registry username/password for authentication")
elif self.registry_token:
self._auth_method = "Registry token"
logger.info("Using registry token for authentication")
else:
self._auth_method = "No auth"
# Load images from file if provided
if image_list_file:
self._load_images_from_file(image_list_file)
if not self.images:
raise ImageNoImagesProvidedError(
file=__file__,
message="No images provided for scanning.",
)
# Audit Config
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
# Fixer Config
self._fixer_config = fixer_config if fixer_config is not None else {}
# Mutelist (not needed for Image provider since Trivy has its own logic)
self._mutelist = None
self.audit_metadata = Audit_Metadata(
provider=self._type,
account_id=self.audited_account,
account_name="image",
region=self.region,
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
)
Provider.set_global_provider(self)
def _load_images_from_file(self, file_path: str) -> None:
"""Load image names from a file (one per line)."""
try:
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
self.images.append(line)
logger.info(f"Loaded {len(self.images)} images from {file_path}")
except FileNotFoundError:
raise ImageListFileNotFoundError(
file=file_path,
message=f"Image list file not found: {file_path}",
)
except Exception as error:
raise ImageListFileReadError(
file=file_path,
original_exception=error,
message=f"Error reading image list file: {error}",
)
@property
def auth_method(self) -> str:
return self._auth_method
@property
def type(self) -> str:
return self._type
@property
def identity(self) -> str:
return self._identity
@property
def session(self) -> None:
return self._session
@property
def audit_config(self) -> dict:
return self._audit_config
@property
def fixer_config(self) -> dict:
return self._fixer_config
def setup_session(self) -> None:
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
def _process_finding(
self, finding: dict, image_name: str, finding_type: str
) -> CheckReportImage:
"""
Process a single finding and create a CheckReportImage object.
Args:
finding: The finding object from Trivy output
image_name: The container image name being scanned
finding_type: The type of finding (Vulnerability, Secret, etc.)
Returns:
CheckReportImage: The processed check report
"""
try:
# Determine finding ID based on type
if "VulnerabilityID" in finding:
finding_id = finding["VulnerabilityID"]
finding_description = finding.get(
"Description", finding.get("Title", "")
)
finding_status = "FAIL"
elif "RuleID" in finding:
# Secret finding
finding_id = finding["RuleID"]
finding_description = finding.get("Title", "Secret detected")
finding_status = "FAIL"
else:
finding_id = finding.get("ID", "UNKNOWN")
finding_description = finding.get("Description", "")
finding_status = finding.get("Status", "FAIL")
# Build remediation text for vulnerabilities
remediation_text = ""
if finding.get("FixedVersion"):
remediation_text = f"Upgrade {finding.get('PkgName', 'package')} to version {finding['FixedVersion']}"
elif finding.get("Resolution"):
remediation_text = finding["Resolution"]
# Convert Trivy severity to Prowler severity (lowercase, map UNKNOWN to informational)
trivy_severity = finding.get("Severity", "UNKNOWN").lower()
if trivy_severity == "unknown":
trivy_severity = "informational"
metadata_dict = {
"Provider": "image",
"CheckID": finding_id,
"CheckTitle": finding.get("Title", finding_id),
"CheckType": ["Container Image Security"],
"ServiceName": finding_type,
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": trivy_severity,
"ResourceType": "container-image",
"ResourceGroup": "container",
"Description": finding_description,
"Risk": finding.get(
"Description", "Vulnerability detected in container image"
),
"RelatedUrl": finding.get("PrimaryURL", ""),
"Remediation": {
"Code": {
"NativeIaC": "",
"Terraform": "",
"CLI": "",
"Other": "",
},
"Recommendation": {
"Text": remediation_text,
"Url": finding.get("PrimaryURL", ""),
},
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
}
# Convert metadata dict to JSON string
metadata = json.dumps(metadata_dict)
report = CheckReportImage(
metadata=metadata, finding=finding, image_name=image_name
)
report.status = finding_status
report.status_extended = self._build_status_extended(finding)
report.region = self.region
return report
except Exception as error:
raise ImageFindingProcessingError(
file=__file__,
original_exception=error,
message=f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}",
)
def _build_status_extended(self, finding: dict) -> str:
"""Build a detailed status message for the finding."""
parts = []
if finding.get("VulnerabilityID"):
parts.append(f"{finding['VulnerabilityID']}")
if finding.get("PkgName"):
pkg_info = finding["PkgName"]
if finding.get("InstalledVersion"):
pkg_info += f"@{finding['InstalledVersion']}"
parts.append(f"in package {pkg_info}")
if finding.get("FixedVersion"):
parts.append(f"(fix available: {finding['FixedVersion']})")
elif finding.get("Status") == "will_not_fix":
parts.append("(no fix available)")
if finding.get("Title"):
parts.append(f"- {finding['Title']}")
return (
" ".join(parts) if parts else finding.get("Description", "Finding detected")
)
def run(self) -> List[CheckReportImage]:
"""Execute the container image scan."""
reports = []
for batch in self.run_scan():
reports.extend(batch)
return reports
def run_scan(self) -> Generator[List[CheckReportImage], None, None]:
"""
Run Trivy scan on all configured images.
Yields:
List[CheckReportImage]: Batches of findings
"""
for image in self.images:
try:
yield from self._scan_single_image(image)
except Exception as error:
logger.error(f"Error scanning image {image}: {error}")
continue
def _scan_single_image(
self, image: str
) -> Generator[List[CheckReportImage], None, None]:
"""
Scan a single container image with Trivy.
Args:
image: The container image name/tag to scan
Yields:
List[CheckReportImage]: Batches of findings
"""
try:
logger.info(f"Scanning container image: {image}")
# Build Trivy command
trivy_command = [
"trivy",
"image",
"--format",
"json",
"--scanners",
",".join(self.scanners),
"--timeout",
self.timeout,
]
if self.trivy_severity:
trivy_command.extend(["--severity", ",".join(self.trivy_severity)])
if self.ignore_unfixed:
trivy_command.append("--ignore-unfixed")
trivy_command.append(image)
# Execute Trivy
process = self._execute_trivy(trivy_command, image)
# Log stderr output
if process.stderr:
self._log_trivy_stderr(process.stderr)
# Parse JSON output
try:
output = json.loads(process.stdout)
results = output.get("Results", [])
if not results:
logger.info(f"No findings for image: {image}")
return
except json.JSONDecodeError as error:
logger.error(f"Failed to parse Trivy output for {image}: {error}")
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
return
# Process findings in batches
batch = []
batch_size = 100
for result in results:
target = result.get("Target", image)
result_type = result.get("Type", "unknown")
# Process Vulnerabilities
for vuln in result.get("Vulnerabilities", []):
report = self._process_finding(vuln, target, result_type)
batch.append(report)
if len(batch) >= batch_size:
yield batch
batch = []
# Process Secrets
for secret in result.get("Secrets", []):
report = self._process_finding(secret, target, "secret")
batch.append(report)
if len(batch) >= batch_size:
yield batch
batch = []
# Process Misconfigurations (from Dockerfile)
for misconfig in result.get("Misconfigurations", []):
report = self._process_finding(
misconfig, target, "misconfiguration"
)
batch.append(report)
if len(batch) >= batch_size:
yield batch
batch = []
# Yield remaining findings
if batch:
yield batch
except Exception as error:
if "No such file or directory: 'trivy'" in str(error):
raise ImageTrivyBinaryNotFoundError(
file=__file__,
original_exception=error,
message="Trivy binary not found. Please install Trivy from https://trivy.dev/latest/getting-started/installation/",
)
logger.error(f"Error scanning image {image}: {error}")
def _build_trivy_env(self) -> dict:
"""Build environment variables for Trivy, injecting registry credentials."""
env = dict(os.environ)
if self.registry_username and self.registry_password:
env["TRIVY_USERNAME"] = self.registry_username
env["TRIVY_PASSWORD"] = self.registry_password
elif self.registry_token:
env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
return env
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
"""Execute Trivy command with optional progress bar."""
env = self._build_trivy_env()
try:
if sys.stdout.isatty():
with alive_bar(
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
bar.title = f"-> Scanning {image}..."
process = subprocess.run(
command,
capture_output=True,
text=True,
env=env,
)
bar.title = f"-> Scan completed for {image}"
return process
else:
logger.info(f"Scanning {image}...")
process = subprocess.run(
command,
capture_output=True,
text=True,
env=env,
)
logger.info(f"Scan completed for {image}")
return process
except (AttributeError, OSError):
logger.info(f"Scanning {image}...")
return subprocess.run(command, capture_output=True, text=True, env=env)
def _log_trivy_stderr(self, stderr: str) -> None:
"""Parse and log Trivy's stderr output."""
for line in stderr.strip().split("\n"):
if line.strip():
parts = line.split()
if len(parts) >= 3:
level = parts[1]
message = " ".join(parts[2:])
if level == "ERROR":
logger.error(message)
elif level == "WARN":
logger.warning(message)
elif level == "INFO":
logger.info(message)
elif level == "DEBUG":
logger.debug(message)
else:
logger.info(message)
else:
logger.info(line)
def print_credentials(self) -> None:
"""Print scan configuration."""
report_title = f"{Style.BRIGHT}Scanning container images:{Style.RESET_ALL}"
report_lines = []
if len(self.images) <= 3:
for img in self.images:
report_lines.append(f"Image: {Fore.YELLOW}{img}{Style.RESET_ALL}")
else:
report_lines.append(
f"Images: {Fore.YELLOW}{len(self.images)} images{Style.RESET_ALL}"
)
report_lines.append(
f"Scanners: {Fore.YELLOW}{', '.join(self.scanners)}{Style.RESET_ALL}"
)
if self.trivy_severity:
report_lines.append(
f"Severity filter: {Fore.YELLOW}{', '.join(self.trivy_severity)}{Style.RESET_ALL}"
)
if self.ignore_unfixed:
report_lines.append(f"Ignore unfixed: {Fore.YELLOW}Yes{Style.RESET_ALL}")
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
report_lines.append(
f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
def test_connection(
image: str = None,
raise_on_exception: bool = True,
provider_id: str = None,
registry_username: str = None,
registry_password: str = None,
registry_token: str = None,
) -> "Connection":
"""
Test connection to container registry by attempting to inspect an image.
Args:
image: Container image to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
registry_username: Registry username for basic auth
registry_password: Registry password for basic auth
registry_token: Registry token for token-based auth
Returns:
Connection: Connection object with success status
"""
try:
if provider_id and not image:
image = provider_id
if not image:
return Connection(is_connected=False, error="Image name is required")
# Build env with registry credentials
env = dict(os.environ)
if registry_username and registry_password:
env["TRIVY_USERNAME"] = registry_username
env["TRIVY_PASSWORD"] = registry_password
elif registry_token:
env["TRIVY_REGISTRY_TOKEN"] = registry_token
# Test by running trivy with --skip-update to just test image access
process = subprocess.run(
[
"trivy",
"image",
"--skip-db-update",
"--download-db-only=false",
image,
],
capture_output=True,
text=True,
timeout=60,
env=env,
)
if process.returncode == 0:
return Connection(is_connected=True)
else:
error_msg = process.stderr or "Unknown error"
if "401" in error_msg or "unauthorized" in error_msg.lower():
return Connection(
is_connected=False,
error="Authentication failed. Check registry credentials.",
)
elif "not found" in error_msg.lower() or "404" in error_msg:
return Connection(
is_connected=False,
error="Image not found in registry.",
)
else:
return Connection(
is_connected=False,
error=f"Failed to access image: {error_msg[:200]}",
)
except subprocess.TimeoutExpired:
return Connection(
is_connected=False,
error="Connection timed out",
)
except FileNotFoundError:
return Connection(
is_connected=False,
error="Trivy binary not found. Please install Trivy.",
)
except Exception as error:
if raise_on_exception:
raise
return Connection(
is_connected=False,
error=f"Unexpected error: {str(error)}",
)

View File

View File

@@ -0,0 +1,113 @@
SCANNERS_CHOICES = [
"vuln",
"secret",
"misconfig",
"license",
]
SEVERITY_CHOICES = [
"CRITICAL",
"HIGH",
"MEDIUM",
"LOW",
"UNKNOWN",
]
def init_parser(self):
"""Init the Image Provider CLI parser"""
image_parser = self.subparsers.add_parser(
"image", parents=[self.common_providers_parser], help="Container Image Provider"
)
# Image Selection
image_selection_group = image_parser.add_argument_group("Image Selection")
image_selection_group.add_argument(
"--image",
"-I",
dest="images",
action="append",
default=[],
help="Container image to scan. Can be specified multiple times. Examples: nginx:latest, alpine:3.18, myregistry.io/myapp:v1.0",
)
image_selection_group.add_argument(
"--image-list",
dest="image_list_file",
default=None,
help="Path to a file containing list of images to scan (one per line). Lines starting with # are treated as comments.",
)
# Scan Configuration
scan_config_group = image_parser.add_argument_group("Scan Configuration")
scan_config_group.add_argument(
"--scanners",
"--scanner",
dest="scanners",
nargs="+",
default=["vuln", "secret"],
choices=SCANNERS_CHOICES,
help="Trivy scanners to use. Default: vuln, secret. Available: vuln, secret, misconfig, license",
)
scan_config_group.add_argument(
"--trivy-severity",
dest="trivy_severity",
nargs="+",
default=[],
choices=SEVERITY_CHOICES,
help="Filter Trivy findings by severity. Default: all severities. Available: CRITICAL, HIGH, MEDIUM, LOW, UNKNOWN",
)
scan_config_group.add_argument(
"--ignore-unfixed",
dest="ignore_unfixed",
action="store_true",
default=False,
help="Ignore vulnerabilities without available fixes.",
)
scan_config_group.add_argument(
"--timeout",
dest="timeout",
default="5m",
help="Trivy scan timeout. Default: 5m. Examples: 10m, 1h",
)
# Registry Authentication
registry_auth_group = image_parser.add_argument_group("Registry Authentication")
registry_auth_group.add_argument(
"--registry-username",
dest="registry_username",
nargs="?",
default=None,
help="Username for private registry authentication (used with --registry-password). If not provided, will use TRIVY_USERNAME env var.",
)
registry_auth_group.add_argument(
"--registry-password",
dest="registry_password",
nargs="?",
default=None,
help="Password for private registry authentication (used with --registry-username). If not provided, will use TRIVY_PASSWORD env var.",
)
registry_auth_group.add_argument(
"--registry-token",
dest="registry_token",
nargs="?",
default=None,
help="Token for private registry authentication. If not provided, will use TRIVY_REGISTRY_TOKEN env var.",
)
def validate_arguments(arguments):
"""Validate Image provider arguments."""
images = getattr(arguments, "images", [])
image_list_file = getattr(arguments, "image_list_file", None)
if not images and not image_list_file:
return (
False,
"At least one image must be specified using --image (-I) or --image-list.",
)
return (True, "")

View File

@@ -0,0 +1,21 @@
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class ImageOutputOptions(ProviderOutputOptions):
"""
ImageOutputOptions customizes output filename logic for container image scanning.
Attributes inherited from ProviderOutputOptions:
- output_filename (str): The base filename used for generated reports.
- output_directory (str): The directory to store the output files.
"""
def __init__(self, arguments, bulk_checks_metadata):
super().__init__(arguments, bulk_checks_metadata)
# If --output-filename is not specified, build a default name
if not getattr(arguments, "output_filename", None):
self.output_filename = f"prowler-output-image-{output_file_timestamp}"
else:
self.output_filename = arguments.output_filename

View File

@@ -0,0 +1,92 @@
import json
# Sample vulnerability finding from Trivy
SAMPLE_VULNERABILITY_FINDING = {
"VulnerabilityID": "CVE-2024-1234",
"PkgID": "openssl@1.1.1k-r0",
"PkgName": "openssl",
"InstalledVersion": "1.1.1k-r0",
"FixedVersion": "1.1.1l-r0",
"Severity": "HIGH",
"Title": "OpenSSL Buffer Overflow",
"Description": "A buffer overflow vulnerability in OpenSSL allows remote attackers to execute arbitrary code.",
"PrimaryURL": "https://avd.aquasec.com/nvd/cve-2024-1234",
}
# Sample secret finding from Trivy
SAMPLE_SECRET_FINDING = {
"RuleID": "aws-access-key-id",
"Category": "AWS",
"Severity": "CRITICAL",
"Title": "AWS Access Key ID",
"StartLine": 10,
"EndLine": 10,
"Match": "AKIA...",
}
# Sample misconfiguration finding from Trivy
SAMPLE_MISCONFIGURATION_FINDING = {
"ID": "DS001",
"Title": "Dockerfile should not use latest tag",
"Description": "Using latest tag can cause unpredictable builds.",
"Severity": "MEDIUM",
"Resolution": "Use a specific version tag instead of latest",
"PrimaryURL": "https://avd.aquasec.com/misconfig/ds001",
}
# Sample finding with UNKNOWN severity
SAMPLE_UNKNOWN_SEVERITY_FINDING = {
"VulnerabilityID": "CVE-2024-9999",
"PkgID": "test-pkg@0.0.1",
"PkgName": "test-pkg",
"InstalledVersion": "0.0.1",
"Severity": "UNKNOWN",
"Title": "Unknown severity issue",
"Description": "An issue with unknown severity.",
}
# Full Trivy JSON output structure with a single vulnerability
SAMPLE_TRIVY_IMAGE_OUTPUT = {
"Results": [
{
"Target": "alpine:3.18 (alpine 3.18.0)",
"Type": "alpine",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [],
"Misconfigurations": [],
}
]
}
# Full Trivy JSON output with mixed finding types
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
"Results": [
{
"Target": "myimage:latest (debian 12)",
"Type": "debian",
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
"Secrets": [SAMPLE_SECRET_FINDING],
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
}
]
}
def get_sample_trivy_json_output():
"""Return sample Trivy JSON output as string."""
return json.dumps(SAMPLE_TRIVY_IMAGE_OUTPUT)
def get_empty_trivy_output():
"""Return empty Trivy output as string."""
return json.dumps({"Results": []})
def get_invalid_trivy_output():
"""Return invalid JSON output as string."""
return "invalid json output"
def get_multi_type_trivy_output():
"""Return Trivy output with multiple finding types as string."""
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)

View File

@@ -0,0 +1,512 @@
import os
import tempfile
from unittest import mock
from unittest.mock import MagicMock, patch
import pytest
from prowler.lib.check.models import CheckReportImage
from prowler.providers.image.exceptions.exceptions import (
ImageListFileNotFoundError,
ImageNoImagesProvidedError,
ImageTrivyBinaryNotFoundError,
)
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
SAMPLE_MISCONFIGURATION_FINDING,
SAMPLE_SECRET_FINDING,
SAMPLE_UNKNOWN_SEVERITY_FINDING,
SAMPLE_VULNERABILITY_FINDING,
get_empty_trivy_output,
get_invalid_trivy_output,
get_multi_type_trivy_output,
get_sample_trivy_json_output,
)
def _make_provider(**kwargs):
"""Helper to create an ImageProvider with test defaults."""
defaults = {
"images": ["alpine:3.18"],
"config_content": {},
}
defaults.update(kwargs)
return ImageProvider(**defaults)
class TestImageProvider:
def test_image_provider(self):
"""Test default initialization."""
provider = _make_provider()
assert provider._type == "image"
assert provider.type == "image"
assert provider.images == ["alpine:3.18"]
assert provider.scanners == ["vuln", "secret"]
assert provider.trivy_severity == []
assert provider.ignore_unfixed is False
assert provider.timeout == "5m"
assert provider.region == "container"
assert provider.audited_account == "image-scan"
assert provider.identity == "prowler"
assert provider.auth_method == "No auth"
assert provider.session is None
assert provider.audit_config == {}
assert provider.fixer_config == {}
assert provider._mutelist is None
def test_image_provider_custom_params(self):
"""Test initialization with custom parameters."""
provider = _make_provider(
images=["nginx:1.25", "redis:7"],
scanners=["vuln", "secret", "misconfig"],
trivy_severity=["HIGH", "CRITICAL"],
ignore_unfixed=True,
timeout="10m",
fixer_config={"key": "value"},
)
assert provider.images == ["nginx:1.25", "redis:7"]
assert provider.scanners == ["vuln", "secret", "misconfig"]
assert provider.trivy_severity == ["HIGH", "CRITICAL"]
assert provider.ignore_unfixed is True
assert provider.timeout == "10m"
assert provider.fixer_config == {"key": "value"}
def test_image_provider_with_image_list_file(self):
"""Test loading images from a file, skipping comments and blank lines."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("# Comment line\n")
f.write("alpine:3.18\n")
f.write("\n")
f.write(" nginx:latest \n")
f.write("# Another comment\n")
f.write("redis:7\n")
f.name
provider = _make_provider(
images=None,
image_list_file=f.name,
)
assert "alpine:3.18" in provider.images
assert "nginx:latest" in provider.images
assert "redis:7" in provider.images
assert len(provider.images) == 3
def test_image_provider_no_images(self):
"""Test that ImageNoImagesProvidedError is raised when no images are given."""
with pytest.raises(ImageNoImagesProvidedError):
_make_provider(images=[])
def test_image_provider_image_list_file_not_found(self):
"""Test that ImageListFileNotFoundError is raised for missing file."""
with pytest.raises(ImageListFileNotFoundError):
_make_provider(
images=None,
image_list_file="/nonexistent/path/images.txt",
)
def test_process_finding_vulnerability(self):
"""Test processing a vulnerability finding."""
provider = _make_provider()
report = provider._process_finding(
SAMPLE_VULNERABILITY_FINDING,
"alpine:3.18 (alpine 3.18.0)",
"alpine",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "CVE-2024-1234"
assert report.check_metadata.Severity == "high"
assert report.check_metadata.ServiceName == "alpine"
assert report.check_metadata.ResourceType == "container-image"
assert report.check_metadata.ResourceGroup == "container"
assert report.package_name == "openssl"
assert report.installed_version == "1.1.1k-r0"
assert report.fixed_version == "1.1.1l-r0"
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
assert report.region == "container"
def test_process_finding_secret(self):
"""Test processing a secret finding (identified by RuleID)."""
provider = _make_provider()
report = provider._process_finding(
SAMPLE_SECRET_FINDING,
"myimage:latest",
"secret",
)
assert isinstance(report, CheckReportImage)
assert report.status == "FAIL"
assert report.check_metadata.CheckID == "aws-access-key-id"
assert report.check_metadata.Severity == "critical"
assert report.check_metadata.ServiceName == "secret"
def test_process_finding_misconfiguration(self):
"""Test processing a misconfiguration finding (identified by ID)."""
provider = _make_provider()
report = provider._process_finding(
SAMPLE_MISCONFIGURATION_FINDING,
"myimage:latest",
"misconfiguration",
)
assert isinstance(report, CheckReportImage)
assert report.check_metadata.CheckID == "DS001"
assert report.check_metadata.Severity == "medium"
assert report.check_metadata.ServiceName == "misconfiguration"
def test_process_finding_unknown_severity(self):
"""Test that UNKNOWN severity is mapped to informational."""
provider = _make_provider()
report = provider._process_finding(
SAMPLE_UNKNOWN_SEVERITY_FINDING,
"myimage:latest",
"alpine",
)
assert report.check_metadata.Severity == "informational"
@patch("subprocess.run")
def test_run_scan_success(self, mock_subprocess):
"""Test successful scan with mocked subprocess."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
stdout=get_sample_trivy_json_output(), stderr=""
)
reports = []
for batch in provider.run_scan():
reports.extend(batch)
assert len(reports) == 1
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
@patch("subprocess.run")
def test_run_scan_empty_output(self, mock_subprocess):
"""Test scan with empty Trivy output produces no findings."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
stdout=get_empty_trivy_output(), stderr=""
)
reports = []
for batch in provider.run_scan():
reports.extend(batch)
assert len(reports) == 0
@patch("subprocess.run")
def test_run_scan_invalid_json(self, mock_subprocess):
"""Test scan with malformed output doesn't crash."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
stdout=get_invalid_trivy_output(), stderr=""
)
reports = []
for batch in provider.run_scan():
reports.extend(batch)
assert len(reports) == 0
@patch("subprocess.run")
def test_run_scan_trivy_not_found(self, mock_subprocess):
"""Test that ImageTrivyBinaryNotFoundError is raised when trivy is missing."""
provider = _make_provider()
mock_subprocess.side_effect = FileNotFoundError(
"[Errno 2] No such file or directory: 'trivy'"
)
with pytest.raises(ImageTrivyBinaryNotFoundError):
for _ in provider._scan_single_image("alpine:3.18"):
pass
@patch("subprocess.run")
def test_run_scan_multiple_images(self, mock_subprocess):
"""Test scanning multiple images makes separate subprocess calls."""
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
mock_subprocess.return_value = MagicMock(
stdout=get_sample_trivy_json_output(), stderr=""
)
reports = []
for batch in provider.run_scan():
reports.extend(batch)
assert mock_subprocess.call_count == 2
@patch("subprocess.run")
def test_run_scan_multi_type_output(self, mock_subprocess):
"""Test scan with vulnerabilities, secrets, and misconfigurations."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
stdout=get_multi_type_trivy_output(), stderr=""
)
reports = []
for batch in provider.run_scan():
reports.extend(batch)
assert len(reports) == 3
check_ids = [r.check_metadata.CheckID for r in reports]
assert "CVE-2024-1234" in check_ids
assert "aws-access-key-id" in check_ids
assert "DS001" in check_ids
def test_print_credentials(self):
"""Test that print_credentials outputs image names."""
provider = _make_provider()
with mock.patch("builtins.print") as mock_print:
provider.print_credentials()
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
assert "alpine:3.18" in output
@patch("subprocess.run")
def test_test_connection_success(self, mock_subprocess):
"""Test successful connection returns is_connected=True."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(image="alpine:3.18")
assert result.is_connected is True
@patch("subprocess.run")
def test_test_connection_auth_failure(self, mock_subprocess):
"""Test 401 error returns auth failure."""
mock_subprocess.return_value = MagicMock(
returncode=1, stderr="401 unauthorized"
)
result = ImageProvider.test_connection(image="private/image:latest")
assert result.is_connected is False
assert "Authentication failed" in result.error
@patch("subprocess.run")
def test_test_connection_not_found(self, mock_subprocess):
"""Test 404 error returns not found."""
mock_subprocess.return_value = MagicMock(returncode=1, stderr="404 not found")
result = ImageProvider.test_connection(image="nonexistent/image:latest")
assert result.is_connected is False
assert "not found" in result.error
def test_build_status_extended(self):
"""Test status message content for different finding types."""
provider = _make_provider()
# Vulnerability with fix
status = provider._build_status_extended(SAMPLE_VULNERABILITY_FINDING)
assert "CVE-2024-1234" in status
assert "openssl" in status
assert "fix available" in status
# Finding with no special fields
status = provider._build_status_extended({"Description": "Simple finding"})
assert status == "Simple finding"
# Finding with will_not_fix status
finding_no_fix = {
"VulnerabilityID": "CVE-2024-0000",
"PkgName": "libc",
"Status": "will_not_fix",
"Title": "Some vuln",
}
status = provider._build_status_extended(finding_no_fix)
assert "no fix available" in status
def test_validate_arguments(self):
"""Test valid and invalid argument combinations."""
# Valid: images provided
provider = _make_provider(images=["alpine:3.18"])
assert provider.images == ["alpine:3.18"]
# Invalid: empty images and no file
with pytest.raises(ImageNoImagesProvidedError):
_make_provider(images=[])
# Valid: custom scanners
provider = _make_provider(scanners=["vuln"])
assert provider.scanners == ["vuln"]
def test_setup_session(self):
"""Test that setup_session returns None."""
provider = _make_provider()
assert provider.setup_session() is None
@patch("subprocess.run")
def test_run_method(self, mock_subprocess):
"""Test that run() collects all batches into a list."""
provider = _make_provider()
mock_subprocess.return_value = MagicMock(
stdout=get_sample_trivy_json_output(), stderr=""
)
reports = provider.run()
assert isinstance(reports, list)
assert len(reports) == 1
class TestImageProviderRegistryAuth:
def test_no_auth_by_default(self):
"""Test that no auth is set when no credentials are provided."""
provider = _make_provider()
assert provider.registry_username is None
assert provider.registry_password is None
assert provider.registry_token is None
assert provider.auth_method == "No auth"
def test_basic_auth_with_explicit_params(self):
"""Test basic auth via explicit constructor params."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
assert provider.registry_username == "myuser"
assert provider.registry_password == "mypass"
assert provider.auth_method == "Basic auth"
def test_token_auth_with_explicit_param(self):
"""Test token auth via explicit constructor param."""
provider = _make_provider(registry_token="my-token-123")
assert provider.registry_token == "my-token-123"
assert provider.auth_method == "Registry token"
def test_basic_auth_takes_precedence_over_token(self):
"""Test that username/password takes precedence over token."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
registry_token="my-token",
)
assert provider.auth_method == "Basic auth"
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
def test_basic_auth_from_env_vars(self):
"""Test that env vars are used as fallback for basic auth."""
provider = _make_provider()
assert provider.registry_username == "envuser"
assert provider.registry_password == "envpass"
assert provider.auth_method == "Basic auth"
@patch.dict(os.environ, {"TRIVY_REGISTRY_TOKEN": "env-token"})
def test_token_auth_from_env_var(self):
"""Test that env var is used as fallback for token auth."""
provider = _make_provider()
assert provider.registry_token == "env-token"
assert provider.auth_method == "Registry token"
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
def test_explicit_params_override_env_vars(self):
"""Test that explicit params take precedence over env vars."""
provider = _make_provider(
registry_username="explicit",
registry_password="explicit-pass",
)
assert provider.registry_username == "explicit"
assert provider.registry_password == "explicit-pass"
def test_build_trivy_env_no_auth(self):
"""Test that _build_trivy_env returns base env when no auth."""
provider = _make_provider()
env = provider._build_trivy_env()
assert "TRIVY_USERNAME" not in env
assert "TRIVY_PASSWORD" not in env
assert "TRIVY_REGISTRY_TOKEN" not in env
def test_build_trivy_env_basic_auth(self):
"""Test that _build_trivy_env injects username/password."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
env = provider._build_trivy_env()
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
def test_build_trivy_env_token_auth(self):
"""Test that _build_trivy_env injects registry token."""
provider = _make_provider(registry_token="my-token")
env = provider._build_trivy_env()
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
@patch("subprocess.run")
def test_execute_trivy_passes_env(self, mock_subprocess):
"""Test that _execute_trivy passes credentials via env."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
mock_subprocess.return_value = MagicMock(
stdout=get_sample_trivy_json_output(), stderr=""
)
provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("subprocess.run")
def test_test_connection_with_basic_auth(self, mock_subprocess):
"""Test test_connection passes credentials via env."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_username="myuser",
registry_password="mypass",
)
assert result.is_connected is True
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_USERNAME"] == "myuser"
assert env["TRIVY_PASSWORD"] == "mypass"
@patch("subprocess.run")
def test_test_connection_with_token(self, mock_subprocess):
"""Test test_connection passes token via env."""
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
result = ImageProvider.test_connection(
image="private.registry.io/myapp:v1",
registry_token="my-token",
)
assert result.is_connected is True
call_kwargs = mock_subprocess.call_args
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
def test_print_credentials_shows_auth_method(self):
"""Test that print_credentials outputs the auth method."""
provider = _make_provider(
registry_username="myuser",
registry_password="mypass",
)
with mock.patch("builtins.print") as mock_print:
provider.print_credentials()
output = " ".join(
str(call.args[0]) for call in mock_print.call_args_list if call.args
)
assert "Basic auth" in output