mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-24 04:28:02 +00:00
Compare commits
4 Commits
docs-agent
...
image-scan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b48b381624 | ||
|
|
9d5e3f4758 | ||
|
|
beb74a6459 | ||
|
|
f42de0d21b |
@@ -20,6 +20,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- CIS 1.12 compliance framework for Kubernetes [(#9778)](https://github.com/prowler-cloud/prowler/pull/9778)
|
||||
- CIS 6.0 for M365 provider [(#9779)](https://github.com/prowler-cloud/prowler/pull/9779)
|
||||
- CIS 5.0 compliance framework for the Azure provider [(#9777)](https://github.com/prowler-cloud/prowler/pull/9777)
|
||||
- Container Image provider (POC) using Trivy for vulnerability and secret scanning
|
||||
|
||||
### Changed
|
||||
- Update AWS Step Functions service metadata to new format [(#9432)](https://github.com/prowler-cloud/prowler/pull/9432)
|
||||
|
||||
@@ -118,6 +118,7 @@ from prowler.providers.common.quick_inventory import run_provider_quick_inventor
|
||||
from prowler.providers.gcp.models import GCPOutputOptions
|
||||
from prowler.providers.github.models import GithubOutputOptions
|
||||
from prowler.providers.iac.models import IACOutputOptions
|
||||
from prowler.providers.image.models import ImageOutputOptions
|
||||
from prowler.providers.kubernetes.models import KubernetesOutputOptions
|
||||
from prowler.providers.llm.models import LLMOutputOptions
|
||||
from prowler.providers.m365.models import M365OutputOptions
|
||||
@@ -204,8 +205,8 @@ def prowler():
|
||||
# Load compliance frameworks
|
||||
logger.debug("Loading compliance frameworks from .json files")
|
||||
|
||||
# Skip compliance frameworks for IAC and LLM providers
|
||||
if provider != "iac" and provider != "llm":
|
||||
# Skip compliance frameworks for IAC, LLM, and Image providers
|
||||
if provider not in ("iac", "llm", "image"):
|
||||
bulk_compliance_frameworks = Compliance.get_bulk(provider)
|
||||
# Complete checks metadata with the compliance framework specification
|
||||
bulk_checks_metadata = update_checks_metadata_with_compliance(
|
||||
@@ -262,8 +263,8 @@ def prowler():
|
||||
if not args.only_logs:
|
||||
global_provider.print_credentials()
|
||||
|
||||
# Skip service and check loading for IAC and LLM providers
|
||||
if provider != "iac" and provider != "llm":
|
||||
# Skip service and check loading for IAC, LLM, and Image providers
|
||||
if provider not in ("iac", "llm", "image"):
|
||||
# Import custom checks from folder
|
||||
if checks_folder:
|
||||
custom_checks = parse_checks_from_folder(global_provider, checks_folder)
|
||||
@@ -346,6 +347,8 @@ def prowler():
|
||||
)
|
||||
elif provider == "iac":
|
||||
output_options = IACOutputOptions(args, bulk_checks_metadata)
|
||||
elif provider == "image":
|
||||
output_options = ImageOutputOptions(args, bulk_checks_metadata)
|
||||
elif provider == "llm":
|
||||
output_options = LLMOutputOptions(args, bulk_checks_metadata)
|
||||
elif provider == "oraclecloud":
|
||||
@@ -365,8 +368,8 @@ def prowler():
|
||||
# Execute checks
|
||||
findings = []
|
||||
|
||||
if provider == "iac" or provider == "llm":
|
||||
# For IAC and LLM providers, run the scan directly
|
||||
if provider in ("iac", "llm", "image"):
|
||||
# For IAC, LLM, and Image providers, run the scan directly
|
||||
if provider == "llm":
|
||||
|
||||
def streaming_callback(findings_batch):
|
||||
|
||||
@@ -73,7 +73,10 @@ def get_available_compliance_frameworks(provider=None):
|
||||
if provider:
|
||||
providers = [provider]
|
||||
for provider in providers:
|
||||
with os.scandir(f"{actual_directory}/../compliance/{provider}") as files:
|
||||
compliance_dir = f"{actual_directory}/../compliance/{provider}"
|
||||
if not os.path.isdir(compliance_dir):
|
||||
continue
|
||||
with os.scandir(compliance_dir) as files:
|
||||
for file in files:
|
||||
if file.is_file() and file.name.endswith(".json"):
|
||||
available_compliance_frameworks.append(
|
||||
|
||||
@@ -163,6 +163,7 @@ class CheckMetadata(BaseModel):
|
||||
check_id
|
||||
and values.get("Provider") != "iac"
|
||||
and values.get("Provider") != "llm"
|
||||
and values.get("Provider") != "image"
|
||||
):
|
||||
service_from_check_id = check_id.split("_")[0]
|
||||
if service_name != service_from_check_id:
|
||||
@@ -183,6 +184,7 @@ class CheckMetadata(BaseModel):
|
||||
check_id
|
||||
and values.get("Provider") != "iac"
|
||||
and values.get("Provider") != "llm"
|
||||
and values.get("Provider") != "image"
|
||||
):
|
||||
if "-" in check_id:
|
||||
raise ValueError(
|
||||
@@ -791,6 +793,44 @@ class CheckReportIAC(Check_Report):
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CheckReportImage(Check_Report):
|
||||
"""Contains the Container Image Check's finding information using Trivy."""
|
||||
|
||||
resource_name: str
|
||||
image_digest: str
|
||||
package_name: str
|
||||
installed_version: str
|
||||
fixed_version: str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
metadata: Optional[dict] = None,
|
||||
finding: Optional[dict] = None,
|
||||
image_name: str = "",
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the Container Image Check's finding information from a Trivy vulnerability/secret dict.
|
||||
|
||||
Args:
|
||||
metadata (Dict): Check metadata.
|
||||
finding (dict): A single vulnerability/secret result from Trivy's JSON output.
|
||||
image_name (str): The container image name being scanned.
|
||||
"""
|
||||
if metadata is None:
|
||||
metadata = {}
|
||||
if finding is None:
|
||||
finding = {}
|
||||
super().__init__(metadata, finding)
|
||||
|
||||
self.resource = finding
|
||||
self.resource_name = image_name
|
||||
self.image_digest = finding.get("PkgID", "")
|
||||
self.package_name = finding.get("PkgName", "")
|
||||
self.installed_version = finding.get("InstalledVersion", "")
|
||||
self.fixed_version = finding.get("FixedVersion", "")
|
||||
|
||||
|
||||
@dataclass
|
||||
class CheckReportLLM(Check_Report):
|
||||
"""Contains the LLM Check's finding information."""
|
||||
|
||||
@@ -14,8 +14,8 @@ def recover_checks_from_provider(
|
||||
Returns a list of tuples with the following format (check_name, check_path)
|
||||
"""
|
||||
try:
|
||||
# Bypass check loading for IAC provider since it uses Trivy directly
|
||||
if provider == "iac" or provider == "llm":
|
||||
# Bypass check loading for IAC, LLM, and Image providers since they use external tools directly
|
||||
if provider in ("iac", "llm", "image"):
|
||||
return []
|
||||
|
||||
checks = []
|
||||
|
||||
@@ -27,10 +27,10 @@ class ProwlerArgumentParser:
|
||||
self.parser = argparse.ArgumentParser(
|
||||
prog="prowler",
|
||||
formatter_class=RawTextHelpFormatter,
|
||||
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,dashboard,iac} ...",
|
||||
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,dashboard,iac,image} ...",
|
||||
epilog="""
|
||||
Available Cloud Providers:
|
||||
{aws,azure,gcp,kubernetes,m365,github,iac,llm,nhn,mongodbatlas,oraclecloud,alibabacloud}
|
||||
{aws,azure,gcp,kubernetes,m365,github,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud}
|
||||
aws AWS Provider
|
||||
azure Azure Provider
|
||||
gcp GCP Provider
|
||||
@@ -41,6 +41,7 @@ Available Cloud Providers:
|
||||
alibabacloud Alibaba Cloud Provider
|
||||
iac IaC Provider (Beta)
|
||||
llm LLM Provider (Beta)
|
||||
image Container Image Provider (PoC)
|
||||
nhn NHN Provider (Unofficial)
|
||||
mongodbatlas MongoDB Atlas Provider (Beta)
|
||||
|
||||
|
||||
@@ -358,6 +358,23 @@ class Finding(BaseModel):
|
||||
)
|
||||
output_data["region"] = check_output.region
|
||||
|
||||
elif provider.type == "image":
|
||||
output_data["auth_method"] = provider.auth_method
|
||||
output_data["account_uid"] = "image"
|
||||
output_data["account_name"] = "image"
|
||||
output_data["resource_name"] = getattr(
|
||||
check_output, "resource_name", ""
|
||||
)
|
||||
output_data["resource_uid"] = getattr(check_output, "resource_name", "")
|
||||
output_data["region"] = getattr(check_output, "region", "container")
|
||||
output_data["package_name"] = getattr(check_output, "package_name", "")
|
||||
output_data["installed_version"] = getattr(
|
||||
check_output, "installed_version", ""
|
||||
)
|
||||
output_data["fixed_version"] = getattr(
|
||||
check_output, "fixed_version", ""
|
||||
)
|
||||
|
||||
# check_output Unique ID
|
||||
# TODO: move this to a function
|
||||
# TODO: in Azure, GCP and K8s there are findings without resource_name
|
||||
|
||||
@@ -77,6 +77,12 @@ def display_summary_table(
|
||||
elif provider.type == "alibabacloud":
|
||||
entity_type = "Account"
|
||||
audited_entities = provider.identity.account_id
|
||||
elif provider.type == "image":
|
||||
entity_type = "Image"
|
||||
if len(provider.images) == 1:
|
||||
audited_entities = provider.images[0]
|
||||
else:
|
||||
audited_entities = f"{len(provider.images)} images"
|
||||
|
||||
# Check if there are findings and that they are not all MANUAL
|
||||
if findings and not all(finding.status == "MANUAL" for finding in findings):
|
||||
|
||||
@@ -266,6 +266,17 @@ class Provider(ABC):
|
||||
config_path=arguments.config_file,
|
||||
fixer_config=fixer_config,
|
||||
)
|
||||
elif "image" in provider_class_name.lower():
|
||||
provider_class(
|
||||
images=arguments.images,
|
||||
image_list_file=arguments.image_list_file,
|
||||
scanners=arguments.scanners,
|
||||
trivy_severity=arguments.trivy_severity,
|
||||
ignore_unfixed=arguments.ignore_unfixed,
|
||||
timeout=arguments.timeout,
|
||||
config_path=arguments.config_file,
|
||||
fixer_config=fixer_config,
|
||||
)
|
||||
elif "mongodbatlas" in provider_class_name.lower():
|
||||
provider_class(
|
||||
atlas_public_key=arguments.atlas_public_key,
|
||||
|
||||
236
prowler/providers/image/README.md
Normal file
236
prowler/providers/image/README.md
Normal file
@@ -0,0 +1,236 @@
|
||||
# Container Image Provider (PoC)
|
||||
|
||||
This is a proof of concept implementation of a container image scanning provider for Prowler using Trivy.
|
||||
|
||||
## Overview
|
||||
|
||||
The Image Provider follows the Tool/Wrapper pattern established by the IaC provider. It delegates all scanning logic to Trivy's `trivy image` command and converts the output to Prowler's finding format.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
### Trivy Installation
|
||||
|
||||
Trivy must be installed and available in your PATH. Install using one of these methods:
|
||||
|
||||
**macOS (Homebrew):**
|
||||
```bash
|
||||
brew install trivy
|
||||
```
|
||||
|
||||
**Linux (apt):**
|
||||
```bash
|
||||
sudo apt-get install trivy
|
||||
```
|
||||
|
||||
**Linux (rpm):**
|
||||
```bash
|
||||
sudo yum install trivy
|
||||
```
|
||||
|
||||
**Docker:**
|
||||
```bash
|
||||
docker pull aquasecurity/trivy
|
||||
```
|
||||
|
||||
For more installation options, see the [Trivy documentation](https://trivy.dev/latest/getting-started/installation/).
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Scan
|
||||
|
||||
Scan a single container image:
|
||||
```bash
|
||||
poetry run python prowler-cli.py image --image nginx:latest
|
||||
```
|
||||
|
||||
### Multiple Images
|
||||
|
||||
Scan multiple images in a single run:
|
||||
```bash
|
||||
poetry run python prowler-cli.py image --image nginx:latest --image alpine:3.18 --image python:3.11
|
||||
```
|
||||
|
||||
### From File
|
||||
|
||||
Scan images listed in a file (one per line):
|
||||
```bash
|
||||
# images.txt
|
||||
nginx:latest
|
||||
alpine:3.18
|
||||
python:3.11
|
||||
# This line is a comment and will be ignored
|
||||
|
||||
poetry run python prowler-cli.py image --image-list images.txt
|
||||
```
|
||||
|
||||
### Scanner Selection
|
||||
|
||||
By default, the provider uses vulnerability and secret scanners. Customize with:
|
||||
```bash
|
||||
# Vulnerability scanning only
|
||||
poetry run python prowler-cli.py image --image nginx:latest --scanners vuln
|
||||
|
||||
# All scanners
|
||||
poetry run python prowler-cli.py image --image nginx:latest --scanners vuln secret misconfig license
|
||||
```
|
||||
|
||||
### Severity Filtering
|
||||
|
||||
Filter findings by severity:
|
||||
```bash
|
||||
# Critical and high only
|
||||
poetry run python prowler-cli.py image --image nginx:latest --trivy-severity CRITICAL HIGH
|
||||
```
|
||||
|
||||
### Ignore Unfixed Vulnerabilities
|
||||
|
||||
Skip vulnerabilities without available fixes:
|
||||
```bash
|
||||
poetry run python prowler-cli.py image --image nginx:latest --ignore-unfixed
|
||||
```
|
||||
|
||||
### Custom Timeout
|
||||
|
||||
Adjust Trivy scan timeout (default: 5m):
|
||||
```bash
|
||||
poetry run python prowler-cli.py image --image large-image:latest --timeout 10m
|
||||
```
|
||||
|
||||
### Output Formats
|
||||
|
||||
Export results in different formats:
|
||||
```bash
|
||||
# JSON and CSV (default includes html)
|
||||
poetry run python prowler-cli.py image --image nginx:latest --output-formats json-ocsf csv
|
||||
|
||||
# Specify output directory
|
||||
poetry run python prowler-cli.py image --image nginx:latest --output-directory ./scan-results
|
||||
```
|
||||
|
||||
## CLI Reference
|
||||
|
||||
```
|
||||
prowler image [OPTIONS]
|
||||
|
||||
Options:
|
||||
--image, -I Container image to scan (can be specified multiple times)
|
||||
--image-list File containing list of images to scan (one per line)
|
||||
--scanners Trivy scanners: vuln, secret, misconfig, license
|
||||
(default: vuln, secret)
|
||||
--trivy-severity Filter: CRITICAL, HIGH, MEDIUM, LOW, UNKNOWN
|
||||
--ignore-unfixed Ignore vulnerabilities without fixes
|
||||
--timeout Trivy scan timeout (default: 5m)
|
||||
|
||||
Standard Prowler Options:
|
||||
--output-formats, -M Output formats (csv, json-ocsf, html)
|
||||
--output-directory, -o Output directory
|
||||
--output-filename, -F Custom output filename
|
||||
--verbose Show all findings during execution
|
||||
--no-banner, -b Hide Prowler banner
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
prowler/providers/image/
|
||||
├── __init__.py
|
||||
├── image_provider.py # Main provider class
|
||||
├── models.py # ImageOutputOptions
|
||||
├── README.md # This file
|
||||
└── lib/
|
||||
└── arguments/
|
||||
├── __init__.py
|
||||
└── arguments.py # CLI argument definitions
|
||||
```
|
||||
|
||||
### Key Components
|
||||
|
||||
1. **ImageProvider** (`image_provider.py`):
|
||||
- Builds and executes `trivy image` commands
|
||||
- Parses JSON output from Trivy
|
||||
- Converts findings to `CheckReportImage` format
|
||||
- Supports scanning multiple images in sequence
|
||||
|
||||
2. **CheckReportImage** (`prowler/lib/check/models.py`):
|
||||
- Extends `Check_Report` base class
|
||||
- Stores vulnerability-specific fields (package name, versions)
|
||||
|
||||
3. **ImageOutputOptions** (`models.py`):
|
||||
- Customizes output filename generation
|
||||
|
||||
4. **CLI Arguments** (`lib/arguments/arguments.py`):
|
||||
- Defines image provider CLI arguments
|
||||
- Validates required arguments
|
||||
|
||||
## Known Limitations (PoC Scope)
|
||||
|
||||
1. **Public Registries Only**: No authentication for private registries
|
||||
2. **No Local Tar Support**: Cannot scan local image tar files
|
||||
3. **No SBOM Export**: Does not generate SBOM output
|
||||
4. **No Compliance Mapping**: No compliance framework integration
|
||||
5. **Sequential Scanning**: Images scanned one at a time (no parallelization)
|
||||
|
||||
## Future Work
|
||||
|
||||
For full implementation, consider:
|
||||
|
||||
1. **Registry Authentication**:
|
||||
- Docker config.json support
|
||||
- Environment variable credentials
|
||||
- Cloud provider registry integration (ECR, GCR, ACR)
|
||||
|
||||
2. **Local Image Support**:
|
||||
- Scan from tar files (`--input` flag)
|
||||
- Scan from Docker daemon
|
||||
|
||||
3. **SBOM Generation**:
|
||||
- CycloneDX output
|
||||
- SPDX output
|
||||
|
||||
4. **Performance**:
|
||||
- Parallel image scanning
|
||||
- Caching of vulnerability databases
|
||||
|
||||
5. **Compliance Integration**:
|
||||
- Map CVEs to compliance frameworks
|
||||
- Custom compliance definitions
|
||||
|
||||
6. **Enhanced Reporting**:
|
||||
- Image-specific HTML reports
|
||||
- Vulnerability trending
|
||||
|
||||
## Trivy Output Format
|
||||
|
||||
Trivy's JSON output structure for image scanning:
|
||||
|
||||
```json
|
||||
{
|
||||
"Results": [
|
||||
{
|
||||
"Target": "nginx:latest (debian 11.7)",
|
||||
"Type": "debian",
|
||||
"Vulnerabilities": [
|
||||
{
|
||||
"VulnerabilityID": "CVE-2023-1234",
|
||||
"PkgName": "openssl",
|
||||
"InstalledVersion": "1.1.1n-0+deb11u4",
|
||||
"FixedVersion": "1.1.1n-0+deb11u5",
|
||||
"Severity": "HIGH",
|
||||
"Title": "Buffer overflow in...",
|
||||
"Description": "...",
|
||||
"PrimaryURL": "https://avd.aquasec.com/nvd/cve-2023-1234"
|
||||
}
|
||||
],
|
||||
"Secrets": [...],
|
||||
"Misconfigurations": [...]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## References
|
||||
|
||||
- [Trivy Documentation](https://trivy.dev/docs/latest/)
|
||||
- [Trivy Image Scanning](https://trivy.dev/docs/latest/guide/target/container_image/)
|
||||
- [Trivy JSON Output](https://trivy.dev/docs/latest/guide/configuration/reporting/)
|
||||
- [Prowler IaC Provider](../iac/) - Reference implementation
|
||||
0
prowler/providers/image/__init__.py
Normal file
0
prowler/providers/image/__init__.py
Normal file
19
prowler/providers/image/exceptions/__init__.py
Normal file
19
prowler/providers/image/exceptions/__init__.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from prowler.providers.image.exceptions.exceptions import (
|
||||
ImageBaseException,
|
||||
ImageFindingProcessingError,
|
||||
ImageListFileNotFoundError,
|
||||
ImageListFileReadError,
|
||||
ImageNoImagesProvidedError,
|
||||
ImageScanError,
|
||||
ImageTrivyBinaryNotFoundError,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ImageBaseException",
|
||||
"ImageFindingProcessingError",
|
||||
"ImageListFileNotFoundError",
|
||||
"ImageListFileReadError",
|
||||
"ImageNoImagesProvidedError",
|
||||
"ImageScanError",
|
||||
"ImageTrivyBinaryNotFoundError",
|
||||
]
|
||||
99
prowler/providers/image/exceptions/exceptions.py
Normal file
99
prowler/providers/image/exceptions/exceptions.py
Normal file
@@ -0,0 +1,99 @@
|
||||
from prowler.exceptions.exceptions import ProwlerException
|
||||
|
||||
|
||||
# Exceptions codes from 9000 to 9999 are reserved for Image exceptions
|
||||
class ImageBaseException(ProwlerException):
|
||||
"""Base class for Image provider errors."""
|
||||
|
||||
IMAGE_ERROR_CODES = {
|
||||
(9000, "ImageNoImagesProvidedError"): {
|
||||
"message": "No container images provided for scanning.",
|
||||
"remediation": "Provide at least one image using --image or --image-list-file.",
|
||||
},
|
||||
(9001, "ImageListFileNotFoundError"): {
|
||||
"message": "Image list file not found.",
|
||||
"remediation": "Ensure the image list file exists at the specified path.",
|
||||
},
|
||||
(9002, "ImageListFileReadError"): {
|
||||
"message": "Error reading image list file.",
|
||||
"remediation": "Check file permissions and format. The file should contain one image per line.",
|
||||
},
|
||||
(9003, "ImageFindingProcessingError"): {
|
||||
"message": "Error processing image scan finding.",
|
||||
"remediation": "Check the Trivy output format and ensure the finding structure is valid.",
|
||||
},
|
||||
(9004, "ImageTrivyBinaryNotFoundError"): {
|
||||
"message": "Trivy binary not found.",
|
||||
"remediation": "Install Trivy from https://trivy.dev/latest/getting-started/installation/",
|
||||
},
|
||||
(9005, "ImageScanError"): {
|
||||
"message": "Error scanning container image.",
|
||||
"remediation": "Check the image name and ensure it is accessible.",
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, code, file=None, original_exception=None, message=None):
|
||||
error_info = self.IMAGE_ERROR_CODES.get((code, self.__class__.__name__))
|
||||
if message:
|
||||
error_info["message"] = message
|
||||
super().__init__(
|
||||
code,
|
||||
source="Image",
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
error_info=error_info,
|
||||
)
|
||||
|
||||
|
||||
class ImageNoImagesProvidedError(ImageBaseException):
|
||||
"""Exception raised when no container images are provided for scanning."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9000, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageListFileNotFoundError(ImageBaseException):
|
||||
"""Exception raised when the image list file is not found."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9001, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageListFileReadError(ImageBaseException):
|
||||
"""Exception raised when the image list file cannot be read."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9002, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageFindingProcessingError(ImageBaseException):
|
||||
"""Exception raised when a finding cannot be processed."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9003, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageTrivyBinaryNotFoundError(ImageBaseException):
|
||||
"""Exception raised when the Trivy binary is not found."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9004, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class ImageScanError(ImageBaseException):
|
||||
"""Exception raised when a general scan error occurs."""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
9005, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
602
prowler/providers/image/image_provider.py
Normal file
602
prowler/providers/image/image_provider.py
Normal file
@@ -0,0 +1,602 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Generator, List
|
||||
|
||||
from alive_progress import alive_bar
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.config.config import (
|
||||
default_config_file_path,
|
||||
load_and_validate_config_file,
|
||||
)
|
||||
from prowler.lib.check.models import CheckReportImage
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.utils.utils import print_boxes
|
||||
from prowler.providers.common.models import Audit_Metadata, Connection
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.image.exceptions.exceptions import (
|
||||
ImageFindingProcessingError,
|
||||
ImageListFileNotFoundError,
|
||||
ImageListFileReadError,
|
||||
ImageNoImagesProvidedError,
|
||||
ImageTrivyBinaryNotFoundError,
|
||||
)
|
||||
|
||||
|
||||
class ImageProvider(Provider):
|
||||
"""
|
||||
Container Image Provider using Trivy for vulnerability and secret scanning.
|
||||
|
||||
This is a Tool/Wrapper provider that delegates all scanning logic to Trivy's
|
||||
`trivy image` command and converts the output to Prowler's finding format.
|
||||
"""
|
||||
|
||||
_type: str = "image"
|
||||
audit_metadata: Audit_Metadata
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
images: list[str] | None = None,
|
||||
image_list_file: str = None,
|
||||
scanners: list[str] | None = None,
|
||||
trivy_severity: list[str] | None = None,
|
||||
ignore_unfixed: bool = False,
|
||||
timeout: str = "5m",
|
||||
config_path: str = None,
|
||||
config_content: dict = None,
|
||||
fixer_config: dict | None = None,
|
||||
registry_username: str = None,
|
||||
registry_password: str = None,
|
||||
registry_token: str = None,
|
||||
):
|
||||
logger.info("Instantiating Image Provider...")
|
||||
|
||||
self.images = images if images is not None else []
|
||||
self.image_list_file = image_list_file
|
||||
self.scanners = scanners if scanners is not None else ["vuln", "secret"]
|
||||
self.trivy_severity = trivy_severity if trivy_severity is not None else []
|
||||
self.ignore_unfixed = ignore_unfixed
|
||||
self.timeout = timeout
|
||||
self.region = "container"
|
||||
self.audited_account = "image-scan"
|
||||
self._session = None
|
||||
self._identity = "prowler"
|
||||
|
||||
# Registry authentication (follows IaC pattern: explicit params, env vars internal)
|
||||
self.registry_username = registry_username or os.environ.get("TRIVY_USERNAME")
|
||||
self.registry_password = registry_password or os.environ.get("TRIVY_PASSWORD")
|
||||
self.registry_token = registry_token or os.environ.get("TRIVY_REGISTRY_TOKEN")
|
||||
|
||||
if self.registry_username and self.registry_password:
|
||||
self._auth_method = "Basic auth"
|
||||
logger.info("Using registry username/password for authentication")
|
||||
elif self.registry_token:
|
||||
self._auth_method = "Registry token"
|
||||
logger.info("Using registry token for authentication")
|
||||
else:
|
||||
self._auth_method = "No auth"
|
||||
|
||||
# Load images from file if provided
|
||||
if image_list_file:
|
||||
self._load_images_from_file(image_list_file)
|
||||
|
||||
if not self.images:
|
||||
raise ImageNoImagesProvidedError(
|
||||
file=__file__,
|
||||
message="No images provided for scanning.",
|
||||
)
|
||||
|
||||
# Audit Config
|
||||
if config_content:
|
||||
self._audit_config = config_content
|
||||
else:
|
||||
if not config_path:
|
||||
config_path = default_config_file_path
|
||||
self._audit_config = load_and_validate_config_file(self._type, config_path)
|
||||
|
||||
# Fixer Config
|
||||
self._fixer_config = fixer_config if fixer_config is not None else {}
|
||||
|
||||
# Mutelist (not needed for Image provider since Trivy has its own logic)
|
||||
self._mutelist = None
|
||||
|
||||
self.audit_metadata = Audit_Metadata(
|
||||
provider=self._type,
|
||||
account_id=self.audited_account,
|
||||
account_name="image",
|
||||
region=self.region,
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
)
|
||||
|
||||
Provider.set_global_provider(self)
|
||||
|
||||
def _load_images_from_file(self, file_path: str) -> None:
|
||||
"""Load image names from a file (one per line)."""
|
||||
try:
|
||||
with open(file_path, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
self.images.append(line)
|
||||
logger.info(f"Loaded {len(self.images)} images from {file_path}")
|
||||
except FileNotFoundError:
|
||||
raise ImageListFileNotFoundError(
|
||||
file=file_path,
|
||||
message=f"Image list file not found: {file_path}",
|
||||
)
|
||||
except Exception as error:
|
||||
raise ImageListFileReadError(
|
||||
file=file_path,
|
||||
original_exception=error,
|
||||
message=f"Error reading image list file: {error}",
|
||||
)
|
||||
|
||||
@property
|
||||
def auth_method(self) -> str:
|
||||
return self._auth_method
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
return self._type
|
||||
|
||||
@property
|
||||
def identity(self) -> str:
|
||||
return self._identity
|
||||
|
||||
@property
|
||||
def session(self) -> None:
|
||||
return self._session
|
||||
|
||||
@property
|
||||
def audit_config(self) -> dict:
|
||||
return self._audit_config
|
||||
|
||||
@property
|
||||
def fixer_config(self) -> dict:
|
||||
return self._fixer_config
|
||||
|
||||
def setup_session(self) -> None:
|
||||
"""Image provider doesn't need a session since it uses Trivy directly"""
|
||||
return None
|
||||
|
||||
def _process_finding(
|
||||
self, finding: dict, image_name: str, finding_type: str
|
||||
) -> CheckReportImage:
|
||||
"""
|
||||
Process a single finding and create a CheckReportImage object.
|
||||
|
||||
Args:
|
||||
finding: The finding object from Trivy output
|
||||
image_name: The container image name being scanned
|
||||
finding_type: The type of finding (Vulnerability, Secret, etc.)
|
||||
|
||||
Returns:
|
||||
CheckReportImage: The processed check report
|
||||
"""
|
||||
try:
|
||||
# Determine finding ID based on type
|
||||
if "VulnerabilityID" in finding:
|
||||
finding_id = finding["VulnerabilityID"]
|
||||
finding_description = finding.get(
|
||||
"Description", finding.get("Title", "")
|
||||
)
|
||||
finding_status = "FAIL"
|
||||
elif "RuleID" in finding:
|
||||
# Secret finding
|
||||
finding_id = finding["RuleID"]
|
||||
finding_description = finding.get("Title", "Secret detected")
|
||||
finding_status = "FAIL"
|
||||
else:
|
||||
finding_id = finding.get("ID", "UNKNOWN")
|
||||
finding_description = finding.get("Description", "")
|
||||
finding_status = finding.get("Status", "FAIL")
|
||||
|
||||
# Build remediation text for vulnerabilities
|
||||
remediation_text = ""
|
||||
if finding.get("FixedVersion"):
|
||||
remediation_text = f"Upgrade {finding.get('PkgName', 'package')} to version {finding['FixedVersion']}"
|
||||
elif finding.get("Resolution"):
|
||||
remediation_text = finding["Resolution"]
|
||||
|
||||
# Convert Trivy severity to Prowler severity (lowercase, map UNKNOWN to informational)
|
||||
trivy_severity = finding.get("Severity", "UNKNOWN").lower()
|
||||
if trivy_severity == "unknown":
|
||||
trivy_severity = "informational"
|
||||
|
||||
metadata_dict = {
|
||||
"Provider": "image",
|
||||
"CheckID": finding_id,
|
||||
"CheckTitle": finding.get("Title", finding_id),
|
||||
"CheckType": ["Container Image Security"],
|
||||
"ServiceName": finding_type,
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": trivy_severity,
|
||||
"ResourceType": "container-image",
|
||||
"ResourceGroup": "container",
|
||||
"Description": finding_description,
|
||||
"Risk": finding.get(
|
||||
"Description", "Vulnerability detected in container image"
|
||||
),
|
||||
"RelatedUrl": finding.get("PrimaryURL", ""),
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"NativeIaC": "",
|
||||
"Terraform": "",
|
||||
"CLI": "",
|
||||
"Other": "",
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": remediation_text,
|
||||
"Url": finding.get("PrimaryURL", ""),
|
||||
},
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "",
|
||||
}
|
||||
|
||||
# Convert metadata dict to JSON string
|
||||
metadata = json.dumps(metadata_dict)
|
||||
|
||||
report = CheckReportImage(
|
||||
metadata=metadata, finding=finding, image_name=image_name
|
||||
)
|
||||
report.status = finding_status
|
||||
report.status_extended = self._build_status_extended(finding)
|
||||
report.region = self.region
|
||||
return report
|
||||
|
||||
except Exception as error:
|
||||
raise ImageFindingProcessingError(
|
||||
file=__file__,
|
||||
original_exception=error,
|
||||
message=f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}",
|
||||
)
|
||||
|
||||
def _build_status_extended(self, finding: dict) -> str:
|
||||
"""Build a detailed status message for the finding."""
|
||||
parts = []
|
||||
|
||||
if finding.get("VulnerabilityID"):
|
||||
parts.append(f"{finding['VulnerabilityID']}")
|
||||
|
||||
if finding.get("PkgName"):
|
||||
pkg_info = finding["PkgName"]
|
||||
if finding.get("InstalledVersion"):
|
||||
pkg_info += f"@{finding['InstalledVersion']}"
|
||||
parts.append(f"in package {pkg_info}")
|
||||
|
||||
if finding.get("FixedVersion"):
|
||||
parts.append(f"(fix available: {finding['FixedVersion']})")
|
||||
elif finding.get("Status") == "will_not_fix":
|
||||
parts.append("(no fix available)")
|
||||
|
||||
if finding.get("Title"):
|
||||
parts.append(f"- {finding['Title']}")
|
||||
|
||||
return (
|
||||
" ".join(parts) if parts else finding.get("Description", "Finding detected")
|
||||
)
|
||||
|
||||
def run(self) -> List[CheckReportImage]:
|
||||
"""Execute the container image scan."""
|
||||
reports = []
|
||||
for batch in self.run_scan():
|
||||
reports.extend(batch)
|
||||
return reports
|
||||
|
||||
def run_scan(self) -> Generator[List[CheckReportImage], None, None]:
|
||||
"""
|
||||
Run Trivy scan on all configured images.
|
||||
|
||||
Yields:
|
||||
List[CheckReportImage]: Batches of findings
|
||||
"""
|
||||
for image in self.images:
|
||||
try:
|
||||
yield from self._scan_single_image(image)
|
||||
except Exception as error:
|
||||
logger.error(f"Error scanning image {image}: {error}")
|
||||
continue
|
||||
|
||||
def _scan_single_image(
|
||||
self, image: str
|
||||
) -> Generator[List[CheckReportImage], None, None]:
|
||||
"""
|
||||
Scan a single container image with Trivy.
|
||||
|
||||
Args:
|
||||
image: The container image name/tag to scan
|
||||
|
||||
Yields:
|
||||
List[CheckReportImage]: Batches of findings
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Scanning container image: {image}")
|
||||
|
||||
# Build Trivy command
|
||||
trivy_command = [
|
||||
"trivy",
|
||||
"image",
|
||||
"--format",
|
||||
"json",
|
||||
"--scanners",
|
||||
",".join(self.scanners),
|
||||
"--timeout",
|
||||
self.timeout,
|
||||
]
|
||||
|
||||
if self.trivy_severity:
|
||||
trivy_command.extend(["--severity", ",".join(self.trivy_severity)])
|
||||
|
||||
if self.ignore_unfixed:
|
||||
trivy_command.append("--ignore-unfixed")
|
||||
|
||||
trivy_command.append(image)
|
||||
|
||||
# Execute Trivy
|
||||
process = self._execute_trivy(trivy_command, image)
|
||||
|
||||
# Log stderr output
|
||||
if process.stderr:
|
||||
self._log_trivy_stderr(process.stderr)
|
||||
|
||||
# Parse JSON output
|
||||
try:
|
||||
output = json.loads(process.stdout)
|
||||
results = output.get("Results", [])
|
||||
|
||||
if not results:
|
||||
logger.info(f"No findings for image: {image}")
|
||||
return
|
||||
|
||||
except json.JSONDecodeError as error:
|
||||
logger.error(f"Failed to parse Trivy output for {image}: {error}")
|
||||
logger.debug(f"Trivy stdout: {process.stdout[:500]}")
|
||||
return
|
||||
|
||||
# Process findings in batches
|
||||
batch = []
|
||||
batch_size = 100
|
||||
|
||||
for result in results:
|
||||
target = result.get("Target", image)
|
||||
result_type = result.get("Type", "unknown")
|
||||
|
||||
# Process Vulnerabilities
|
||||
for vuln in result.get("Vulnerabilities", []):
|
||||
report = self._process_finding(vuln, target, result_type)
|
||||
batch.append(report)
|
||||
if len(batch) >= batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
# Process Secrets
|
||||
for secret in result.get("Secrets", []):
|
||||
report = self._process_finding(secret, target, "secret")
|
||||
batch.append(report)
|
||||
if len(batch) >= batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
# Process Misconfigurations (from Dockerfile)
|
||||
for misconfig in result.get("Misconfigurations", []):
|
||||
report = self._process_finding(
|
||||
misconfig, target, "misconfiguration"
|
||||
)
|
||||
batch.append(report)
|
||||
if len(batch) >= batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
# Yield remaining findings
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
except Exception as error:
|
||||
if "No such file or directory: 'trivy'" in str(error):
|
||||
raise ImageTrivyBinaryNotFoundError(
|
||||
file=__file__,
|
||||
original_exception=error,
|
||||
message="Trivy binary not found. Please install Trivy from https://trivy.dev/latest/getting-started/installation/",
|
||||
)
|
||||
logger.error(f"Error scanning image {image}: {error}")
|
||||
|
||||
def _build_trivy_env(self) -> dict:
|
||||
"""Build environment variables for Trivy, injecting registry credentials."""
|
||||
env = dict(os.environ)
|
||||
if self.registry_username and self.registry_password:
|
||||
env["TRIVY_USERNAME"] = self.registry_username
|
||||
env["TRIVY_PASSWORD"] = self.registry_password
|
||||
elif self.registry_token:
|
||||
env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
|
||||
return env
|
||||
|
||||
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
|
||||
"""Execute Trivy command with optional progress bar."""
|
||||
env = self._build_trivy_env()
|
||||
try:
|
||||
if sys.stdout.isatty():
|
||||
with alive_bar(
|
||||
ctrl_c=False,
|
||||
bar="blocks",
|
||||
spinner="classic",
|
||||
stats=False,
|
||||
enrich_print=False,
|
||||
) as bar:
|
||||
bar.title = f"-> Scanning {image}..."
|
||||
process = subprocess.run(
|
||||
command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=env,
|
||||
)
|
||||
bar.title = f"-> Scan completed for {image}"
|
||||
return process
|
||||
else:
|
||||
logger.info(f"Scanning {image}...")
|
||||
process = subprocess.run(
|
||||
command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=env,
|
||||
)
|
||||
logger.info(f"Scan completed for {image}")
|
||||
return process
|
||||
except (AttributeError, OSError):
|
||||
logger.info(f"Scanning {image}...")
|
||||
return subprocess.run(command, capture_output=True, text=True, env=env)
|
||||
|
||||
def _log_trivy_stderr(self, stderr: str) -> None:
|
||||
"""Parse and log Trivy's stderr output."""
|
||||
for line in stderr.strip().split("\n"):
|
||||
if line.strip():
|
||||
parts = line.split()
|
||||
if len(parts) >= 3:
|
||||
level = parts[1]
|
||||
message = " ".join(parts[2:])
|
||||
if level == "ERROR":
|
||||
logger.error(message)
|
||||
elif level == "WARN":
|
||||
logger.warning(message)
|
||||
elif level == "INFO":
|
||||
logger.info(message)
|
||||
elif level == "DEBUG":
|
||||
logger.debug(message)
|
||||
else:
|
||||
logger.info(message)
|
||||
else:
|
||||
logger.info(line)
|
||||
|
||||
def print_credentials(self) -> None:
|
||||
"""Print scan configuration."""
|
||||
report_title = f"{Style.BRIGHT}Scanning container images:{Style.RESET_ALL}"
|
||||
|
||||
report_lines = []
|
||||
if len(self.images) <= 3:
|
||||
for img in self.images:
|
||||
report_lines.append(f"Image: {Fore.YELLOW}{img}{Style.RESET_ALL}")
|
||||
else:
|
||||
report_lines.append(
|
||||
f"Images: {Fore.YELLOW}{len(self.images)} images{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
report_lines.append(
|
||||
f"Scanners: {Fore.YELLOW}{', '.join(self.scanners)}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
if self.trivy_severity:
|
||||
report_lines.append(
|
||||
f"Severity filter: {Fore.YELLOW}{', '.join(self.trivy_severity)}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
if self.ignore_unfixed:
|
||||
report_lines.append(f"Ignore unfixed: {Fore.YELLOW}Yes{Style.RESET_ALL}")
|
||||
|
||||
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
|
||||
|
||||
report_lines.append(
|
||||
f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
print_boxes(report_lines, report_title)
|
||||
|
||||
@staticmethod
|
||||
def test_connection(
|
||||
image: str = None,
|
||||
raise_on_exception: bool = True,
|
||||
provider_id: str = None,
|
||||
registry_username: str = None,
|
||||
registry_password: str = None,
|
||||
registry_token: str = None,
|
||||
) -> "Connection":
|
||||
"""
|
||||
Test connection to container registry by attempting to inspect an image.
|
||||
|
||||
Args:
|
||||
image: Container image to test
|
||||
raise_on_exception: Whether to raise exceptions
|
||||
provider_id: Fallback for image name
|
||||
registry_username: Registry username for basic auth
|
||||
registry_password: Registry password for basic auth
|
||||
registry_token: Registry token for token-based auth
|
||||
|
||||
Returns:
|
||||
Connection: Connection object with success status
|
||||
"""
|
||||
try:
|
||||
if provider_id and not image:
|
||||
image = provider_id
|
||||
|
||||
if not image:
|
||||
return Connection(is_connected=False, error="Image name is required")
|
||||
|
||||
# Build env with registry credentials
|
||||
env = dict(os.environ)
|
||||
if registry_username and registry_password:
|
||||
env["TRIVY_USERNAME"] = registry_username
|
||||
env["TRIVY_PASSWORD"] = registry_password
|
||||
elif registry_token:
|
||||
env["TRIVY_REGISTRY_TOKEN"] = registry_token
|
||||
|
||||
# Test by running trivy with --skip-update to just test image access
|
||||
process = subprocess.run(
|
||||
[
|
||||
"trivy",
|
||||
"image",
|
||||
"--skip-db-update",
|
||||
"--download-db-only=false",
|
||||
image,
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
env=env,
|
||||
)
|
||||
|
||||
if process.returncode == 0:
|
||||
return Connection(is_connected=True)
|
||||
else:
|
||||
error_msg = process.stderr or "Unknown error"
|
||||
if "401" in error_msg or "unauthorized" in error_msg.lower():
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error="Authentication failed. Check registry credentials.",
|
||||
)
|
||||
elif "not found" in error_msg.lower() or "404" in error_msg:
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error="Image not found in registry.",
|
||||
)
|
||||
else:
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error=f"Failed to access image: {error_msg[:200]}",
|
||||
)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error="Connection timed out",
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error="Trivy binary not found. Please install Trivy.",
|
||||
)
|
||||
except Exception as error:
|
||||
if raise_on_exception:
|
||||
raise
|
||||
return Connection(
|
||||
is_connected=False,
|
||||
error=f"Unexpected error: {str(error)}",
|
||||
)
|
||||
0
prowler/providers/image/lib/__init__.py
Normal file
0
prowler/providers/image/lib/__init__.py
Normal file
0
prowler/providers/image/lib/arguments/__init__.py
Normal file
0
prowler/providers/image/lib/arguments/__init__.py
Normal file
113
prowler/providers/image/lib/arguments/arguments.py
Normal file
113
prowler/providers/image/lib/arguments/arguments.py
Normal file
@@ -0,0 +1,113 @@
|
||||
SCANNERS_CHOICES = [
|
||||
"vuln",
|
||||
"secret",
|
||||
"misconfig",
|
||||
"license",
|
||||
]
|
||||
|
||||
SEVERITY_CHOICES = [
|
||||
"CRITICAL",
|
||||
"HIGH",
|
||||
"MEDIUM",
|
||||
"LOW",
|
||||
"UNKNOWN",
|
||||
]
|
||||
|
||||
|
||||
def init_parser(self):
|
||||
"""Init the Image Provider CLI parser"""
|
||||
image_parser = self.subparsers.add_parser(
|
||||
"image", parents=[self.common_providers_parser], help="Container Image Provider"
|
||||
)
|
||||
|
||||
# Image Selection
|
||||
image_selection_group = image_parser.add_argument_group("Image Selection")
|
||||
image_selection_group.add_argument(
|
||||
"--image",
|
||||
"-I",
|
||||
dest="images",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Container image to scan. Can be specified multiple times. Examples: nginx:latest, alpine:3.18, myregistry.io/myapp:v1.0",
|
||||
)
|
||||
|
||||
image_selection_group.add_argument(
|
||||
"--image-list",
|
||||
dest="image_list_file",
|
||||
default=None,
|
||||
help="Path to a file containing list of images to scan (one per line). Lines starting with # are treated as comments.",
|
||||
)
|
||||
|
||||
# Scan Configuration
|
||||
scan_config_group = image_parser.add_argument_group("Scan Configuration")
|
||||
scan_config_group.add_argument(
|
||||
"--scanners",
|
||||
"--scanner",
|
||||
dest="scanners",
|
||||
nargs="+",
|
||||
default=["vuln", "secret"],
|
||||
choices=SCANNERS_CHOICES,
|
||||
help="Trivy scanners to use. Default: vuln, secret. Available: vuln, secret, misconfig, license",
|
||||
)
|
||||
|
||||
scan_config_group.add_argument(
|
||||
"--trivy-severity",
|
||||
dest="trivy_severity",
|
||||
nargs="+",
|
||||
default=[],
|
||||
choices=SEVERITY_CHOICES,
|
||||
help="Filter Trivy findings by severity. Default: all severities. Available: CRITICAL, HIGH, MEDIUM, LOW, UNKNOWN",
|
||||
)
|
||||
|
||||
scan_config_group.add_argument(
|
||||
"--ignore-unfixed",
|
||||
dest="ignore_unfixed",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Ignore vulnerabilities without available fixes.",
|
||||
)
|
||||
|
||||
scan_config_group.add_argument(
|
||||
"--timeout",
|
||||
dest="timeout",
|
||||
default="5m",
|
||||
help="Trivy scan timeout. Default: 5m. Examples: 10m, 1h",
|
||||
)
|
||||
|
||||
# Registry Authentication
|
||||
registry_auth_group = image_parser.add_argument_group("Registry Authentication")
|
||||
registry_auth_group.add_argument(
|
||||
"--registry-username",
|
||||
dest="registry_username",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Username for private registry authentication (used with --registry-password). If not provided, will use TRIVY_USERNAME env var.",
|
||||
)
|
||||
registry_auth_group.add_argument(
|
||||
"--registry-password",
|
||||
dest="registry_password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Password for private registry authentication (used with --registry-username). If not provided, will use TRIVY_PASSWORD env var.",
|
||||
)
|
||||
registry_auth_group.add_argument(
|
||||
"--registry-token",
|
||||
dest="registry_token",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Token for private registry authentication. If not provided, will use TRIVY_REGISTRY_TOKEN env var.",
|
||||
)
|
||||
|
||||
|
||||
def validate_arguments(arguments):
|
||||
"""Validate Image provider arguments."""
|
||||
images = getattr(arguments, "images", [])
|
||||
image_list_file = getattr(arguments, "image_list_file", None)
|
||||
|
||||
if not images and not image_list_file:
|
||||
return (
|
||||
False,
|
||||
"At least one image must be specified using --image (-I) or --image-list.",
|
||||
)
|
||||
|
||||
return (True, "")
|
||||
21
prowler/providers/image/models.py
Normal file
21
prowler/providers/image/models.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from prowler.config.config import output_file_timestamp
|
||||
from prowler.providers.common.models import ProviderOutputOptions
|
||||
|
||||
|
||||
class ImageOutputOptions(ProviderOutputOptions):
|
||||
"""
|
||||
ImageOutputOptions customizes output filename logic for container image scanning.
|
||||
|
||||
Attributes inherited from ProviderOutputOptions:
|
||||
- output_filename (str): The base filename used for generated reports.
|
||||
- output_directory (str): The directory to store the output files.
|
||||
"""
|
||||
|
||||
def __init__(self, arguments, bulk_checks_metadata):
|
||||
super().__init__(arguments, bulk_checks_metadata)
|
||||
|
||||
# If --output-filename is not specified, build a default name
|
||||
if not getattr(arguments, "output_filename", None):
|
||||
self.output_filename = f"prowler-output-image-{output_file_timestamp}"
|
||||
else:
|
||||
self.output_filename = arguments.output_filename
|
||||
92
tests/providers/image/image_fixtures.py
Normal file
92
tests/providers/image/image_fixtures.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import json
|
||||
|
||||
# Sample vulnerability finding from Trivy
|
||||
SAMPLE_VULNERABILITY_FINDING = {
|
||||
"VulnerabilityID": "CVE-2024-1234",
|
||||
"PkgID": "openssl@1.1.1k-r0",
|
||||
"PkgName": "openssl",
|
||||
"InstalledVersion": "1.1.1k-r0",
|
||||
"FixedVersion": "1.1.1l-r0",
|
||||
"Severity": "HIGH",
|
||||
"Title": "OpenSSL Buffer Overflow",
|
||||
"Description": "A buffer overflow vulnerability in OpenSSL allows remote attackers to execute arbitrary code.",
|
||||
"PrimaryURL": "https://avd.aquasec.com/nvd/cve-2024-1234",
|
||||
}
|
||||
|
||||
# Sample secret finding from Trivy
|
||||
SAMPLE_SECRET_FINDING = {
|
||||
"RuleID": "aws-access-key-id",
|
||||
"Category": "AWS",
|
||||
"Severity": "CRITICAL",
|
||||
"Title": "AWS Access Key ID",
|
||||
"StartLine": 10,
|
||||
"EndLine": 10,
|
||||
"Match": "AKIA...",
|
||||
}
|
||||
|
||||
# Sample misconfiguration finding from Trivy
|
||||
SAMPLE_MISCONFIGURATION_FINDING = {
|
||||
"ID": "DS001",
|
||||
"Title": "Dockerfile should not use latest tag",
|
||||
"Description": "Using latest tag can cause unpredictable builds.",
|
||||
"Severity": "MEDIUM",
|
||||
"Resolution": "Use a specific version tag instead of latest",
|
||||
"PrimaryURL": "https://avd.aquasec.com/misconfig/ds001",
|
||||
}
|
||||
|
||||
# Sample finding with UNKNOWN severity
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING = {
|
||||
"VulnerabilityID": "CVE-2024-9999",
|
||||
"PkgID": "test-pkg@0.0.1",
|
||||
"PkgName": "test-pkg",
|
||||
"InstalledVersion": "0.0.1",
|
||||
"Severity": "UNKNOWN",
|
||||
"Title": "Unknown severity issue",
|
||||
"Description": "An issue with unknown severity.",
|
||||
}
|
||||
|
||||
# Full Trivy JSON output structure with a single vulnerability
|
||||
SAMPLE_TRIVY_IMAGE_OUTPUT = {
|
||||
"Results": [
|
||||
{
|
||||
"Target": "alpine:3.18 (alpine 3.18.0)",
|
||||
"Type": "alpine",
|
||||
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
|
||||
"Secrets": [],
|
||||
"Misconfigurations": [],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Full Trivy JSON output with mixed finding types
|
||||
SAMPLE_TRIVY_MULTI_TYPE_OUTPUT = {
|
||||
"Results": [
|
||||
{
|
||||
"Target": "myimage:latest (debian 12)",
|
||||
"Type": "debian",
|
||||
"Vulnerabilities": [SAMPLE_VULNERABILITY_FINDING],
|
||||
"Secrets": [SAMPLE_SECRET_FINDING],
|
||||
"Misconfigurations": [SAMPLE_MISCONFIGURATION_FINDING],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def get_sample_trivy_json_output():
|
||||
"""Return sample Trivy JSON output as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_IMAGE_OUTPUT)
|
||||
|
||||
|
||||
def get_empty_trivy_output():
|
||||
"""Return empty Trivy output as string."""
|
||||
return json.dumps({"Results": []})
|
||||
|
||||
|
||||
def get_invalid_trivy_output():
|
||||
"""Return invalid JSON output as string."""
|
||||
return "invalid json output"
|
||||
|
||||
|
||||
def get_multi_type_trivy_output():
|
||||
"""Return Trivy output with multiple finding types as string."""
|
||||
return json.dumps(SAMPLE_TRIVY_MULTI_TYPE_OUTPUT)
|
||||
512
tests/providers/image/image_provider_test.py
Normal file
512
tests/providers/image/image_provider_test.py
Normal file
@@ -0,0 +1,512 @@
|
||||
import os
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.check.models import CheckReportImage
|
||||
from prowler.providers.image.exceptions.exceptions import (
|
||||
ImageListFileNotFoundError,
|
||||
ImageNoImagesProvidedError,
|
||||
ImageTrivyBinaryNotFoundError,
|
||||
)
|
||||
from prowler.providers.image.image_provider import ImageProvider
|
||||
from tests.providers.image.image_fixtures import (
|
||||
SAMPLE_MISCONFIGURATION_FINDING,
|
||||
SAMPLE_SECRET_FINDING,
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING,
|
||||
SAMPLE_VULNERABILITY_FINDING,
|
||||
get_empty_trivy_output,
|
||||
get_invalid_trivy_output,
|
||||
get_multi_type_trivy_output,
|
||||
get_sample_trivy_json_output,
|
||||
)
|
||||
|
||||
|
||||
def _make_provider(**kwargs):
|
||||
"""Helper to create an ImageProvider with test defaults."""
|
||||
defaults = {
|
||||
"images": ["alpine:3.18"],
|
||||
"config_content": {},
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
return ImageProvider(**defaults)
|
||||
|
||||
|
||||
class TestImageProvider:
|
||||
def test_image_provider(self):
|
||||
"""Test default initialization."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider._type == "image"
|
||||
assert provider.type == "image"
|
||||
assert provider.images == ["alpine:3.18"]
|
||||
assert provider.scanners == ["vuln", "secret"]
|
||||
assert provider.trivy_severity == []
|
||||
assert provider.ignore_unfixed is False
|
||||
assert provider.timeout == "5m"
|
||||
assert provider.region == "container"
|
||||
assert provider.audited_account == "image-scan"
|
||||
assert provider.identity == "prowler"
|
||||
assert provider.auth_method == "No auth"
|
||||
assert provider.session is None
|
||||
assert provider.audit_config == {}
|
||||
assert provider.fixer_config == {}
|
||||
assert provider._mutelist is None
|
||||
|
||||
def test_image_provider_custom_params(self):
|
||||
"""Test initialization with custom parameters."""
|
||||
provider = _make_provider(
|
||||
images=["nginx:1.25", "redis:7"],
|
||||
scanners=["vuln", "secret", "misconfig"],
|
||||
trivy_severity=["HIGH", "CRITICAL"],
|
||||
ignore_unfixed=True,
|
||||
timeout="10m",
|
||||
fixer_config={"key": "value"},
|
||||
)
|
||||
|
||||
assert provider.images == ["nginx:1.25", "redis:7"]
|
||||
assert provider.scanners == ["vuln", "secret", "misconfig"]
|
||||
assert provider.trivy_severity == ["HIGH", "CRITICAL"]
|
||||
assert provider.ignore_unfixed is True
|
||||
assert provider.timeout == "10m"
|
||||
assert provider.fixer_config == {"key": "value"}
|
||||
|
||||
def test_image_provider_with_image_list_file(self):
|
||||
"""Test loading images from a file, skipping comments and blank lines."""
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
|
||||
f.write("# Comment line\n")
|
||||
f.write("alpine:3.18\n")
|
||||
f.write("\n")
|
||||
f.write(" nginx:latest \n")
|
||||
f.write("# Another comment\n")
|
||||
f.write("redis:7\n")
|
||||
f.name
|
||||
|
||||
provider = _make_provider(
|
||||
images=None,
|
||||
image_list_file=f.name,
|
||||
)
|
||||
|
||||
assert "alpine:3.18" in provider.images
|
||||
assert "nginx:latest" in provider.images
|
||||
assert "redis:7" in provider.images
|
||||
assert len(provider.images) == 3
|
||||
|
||||
def test_image_provider_no_images(self):
|
||||
"""Test that ImageNoImagesProvidedError is raised when no images are given."""
|
||||
with pytest.raises(ImageNoImagesProvidedError):
|
||||
_make_provider(images=[])
|
||||
|
||||
def test_image_provider_image_list_file_not_found(self):
|
||||
"""Test that ImageListFileNotFoundError is raised for missing file."""
|
||||
with pytest.raises(ImageListFileNotFoundError):
|
||||
_make_provider(
|
||||
images=None,
|
||||
image_list_file="/nonexistent/path/images.txt",
|
||||
)
|
||||
|
||||
def test_process_finding_vulnerability(self):
|
||||
"""Test processing a vulnerability finding."""
|
||||
provider = _make_provider()
|
||||
report = provider._process_finding(
|
||||
SAMPLE_VULNERABILITY_FINDING,
|
||||
"alpine:3.18 (alpine 3.18.0)",
|
||||
"alpine",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.status == "FAIL"
|
||||
assert report.check_metadata.CheckID == "CVE-2024-1234"
|
||||
assert report.check_metadata.Severity == "high"
|
||||
assert report.check_metadata.ServiceName == "alpine"
|
||||
assert report.check_metadata.ResourceType == "container-image"
|
||||
assert report.check_metadata.ResourceGroup == "container"
|
||||
assert report.package_name == "openssl"
|
||||
assert report.installed_version == "1.1.1k-r0"
|
||||
assert report.fixed_version == "1.1.1l-r0"
|
||||
assert report.resource_name == "alpine:3.18 (alpine 3.18.0)"
|
||||
assert report.region == "container"
|
||||
|
||||
def test_process_finding_secret(self):
|
||||
"""Test processing a secret finding (identified by RuleID)."""
|
||||
provider = _make_provider()
|
||||
report = provider._process_finding(
|
||||
SAMPLE_SECRET_FINDING,
|
||||
"myimage:latest",
|
||||
"secret",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.status == "FAIL"
|
||||
assert report.check_metadata.CheckID == "aws-access-key-id"
|
||||
assert report.check_metadata.Severity == "critical"
|
||||
assert report.check_metadata.ServiceName == "secret"
|
||||
|
||||
def test_process_finding_misconfiguration(self):
|
||||
"""Test processing a misconfiguration finding (identified by ID)."""
|
||||
provider = _make_provider()
|
||||
report = provider._process_finding(
|
||||
SAMPLE_MISCONFIGURATION_FINDING,
|
||||
"myimage:latest",
|
||||
"misconfiguration",
|
||||
)
|
||||
|
||||
assert isinstance(report, CheckReportImage)
|
||||
assert report.check_metadata.CheckID == "DS001"
|
||||
assert report.check_metadata.Severity == "medium"
|
||||
assert report.check_metadata.ServiceName == "misconfiguration"
|
||||
|
||||
def test_process_finding_unknown_severity(self):
|
||||
"""Test that UNKNOWN severity is mapped to informational."""
|
||||
provider = _make_provider()
|
||||
report = provider._process_finding(
|
||||
SAMPLE_UNKNOWN_SEVERITY_FINDING,
|
||||
"myimage:latest",
|
||||
"alpine",
|
||||
)
|
||||
|
||||
assert report.check_metadata.Severity == "informational"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_success(self, mock_subprocess):
|
||||
"""Test successful scan with mocked subprocess."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].check_metadata.CheckID == "CVE-2024-1234"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_empty_output(self, mock_subprocess):
|
||||
"""Test scan with empty Trivy output produces no findings."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_empty_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 0
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_invalid_json(self, mock_subprocess):
|
||||
"""Test scan with malformed output doesn't crash."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_invalid_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 0
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_trivy_not_found(self, mock_subprocess):
|
||||
"""Test that ImageTrivyBinaryNotFoundError is raised when trivy is missing."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.side_effect = FileNotFoundError(
|
||||
"[Errno 2] No such file or directory: 'trivy'"
|
||||
)
|
||||
|
||||
with pytest.raises(ImageTrivyBinaryNotFoundError):
|
||||
for _ in provider._scan_single_image("alpine:3.18"):
|
||||
pass
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_multiple_images(self, mock_subprocess):
|
||||
"""Test scanning multiple images makes separate subprocess calls."""
|
||||
provider = _make_provider(images=["alpine:3.18", "nginx:latest"])
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
assert mock_subprocess.call_count == 2
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_scan_multi_type_output(self, mock_subprocess):
|
||||
"""Test scan with vulnerabilities, secrets, and misconfigurations."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_multi_type_trivy_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = []
|
||||
for batch in provider.run_scan():
|
||||
reports.extend(batch)
|
||||
|
||||
assert len(reports) == 3
|
||||
|
||||
check_ids = [r.check_metadata.CheckID for r in reports]
|
||||
assert "CVE-2024-1234" in check_ids
|
||||
assert "aws-access-key-id" in check_ids
|
||||
assert "DS001" in check_ids
|
||||
|
||||
def test_print_credentials(self):
|
||||
"""Test that print_credentials outputs image names."""
|
||||
provider = _make_provider()
|
||||
with mock.patch("builtins.print") as mock_print:
|
||||
provider.print_credentials()
|
||||
output = " ".join(
|
||||
str(call.args[0]) for call in mock_print.call_args_list if call.args
|
||||
)
|
||||
assert "alpine:3.18" in output
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_success(self, mock_subprocess):
|
||||
"""Test successful connection returns is_connected=True."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(image="alpine:3.18")
|
||||
|
||||
assert result.is_connected is True
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_auth_failure(self, mock_subprocess):
|
||||
"""Test 401 error returns auth failure."""
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1, stderr="401 unauthorized"
|
||||
)
|
||||
|
||||
result = ImageProvider.test_connection(image="private/image:latest")
|
||||
|
||||
assert result.is_connected is False
|
||||
assert "Authentication failed" in result.error
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_not_found(self, mock_subprocess):
|
||||
"""Test 404 error returns not found."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=1, stderr="404 not found")
|
||||
|
||||
result = ImageProvider.test_connection(image="nonexistent/image:latest")
|
||||
|
||||
assert result.is_connected is False
|
||||
assert "not found" in result.error
|
||||
|
||||
def test_build_status_extended(self):
|
||||
"""Test status message content for different finding types."""
|
||||
provider = _make_provider()
|
||||
|
||||
# Vulnerability with fix
|
||||
status = provider._build_status_extended(SAMPLE_VULNERABILITY_FINDING)
|
||||
assert "CVE-2024-1234" in status
|
||||
assert "openssl" in status
|
||||
assert "fix available" in status
|
||||
|
||||
# Finding with no special fields
|
||||
status = provider._build_status_extended({"Description": "Simple finding"})
|
||||
assert status == "Simple finding"
|
||||
|
||||
# Finding with will_not_fix status
|
||||
finding_no_fix = {
|
||||
"VulnerabilityID": "CVE-2024-0000",
|
||||
"PkgName": "libc",
|
||||
"Status": "will_not_fix",
|
||||
"Title": "Some vuln",
|
||||
}
|
||||
status = provider._build_status_extended(finding_no_fix)
|
||||
assert "no fix available" in status
|
||||
|
||||
def test_validate_arguments(self):
|
||||
"""Test valid and invalid argument combinations."""
|
||||
# Valid: images provided
|
||||
provider = _make_provider(images=["alpine:3.18"])
|
||||
assert provider.images == ["alpine:3.18"]
|
||||
|
||||
# Invalid: empty images and no file
|
||||
with pytest.raises(ImageNoImagesProvidedError):
|
||||
_make_provider(images=[])
|
||||
|
||||
# Valid: custom scanners
|
||||
provider = _make_provider(scanners=["vuln"])
|
||||
assert provider.scanners == ["vuln"]
|
||||
|
||||
def test_setup_session(self):
|
||||
"""Test that setup_session returns None."""
|
||||
provider = _make_provider()
|
||||
assert provider.setup_session() is None
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_run_method(self, mock_subprocess):
|
||||
"""Test that run() collects all batches into a list."""
|
||||
provider = _make_provider()
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
reports = provider.run()
|
||||
|
||||
assert isinstance(reports, list)
|
||||
assert len(reports) == 1
|
||||
|
||||
|
||||
class TestImageProviderRegistryAuth:
|
||||
def test_no_auth_by_default(self):
|
||||
"""Test that no auth is set when no credentials are provided."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider.registry_username is None
|
||||
assert provider.registry_password is None
|
||||
assert provider.registry_token is None
|
||||
assert provider.auth_method == "No auth"
|
||||
|
||||
def test_basic_auth_with_explicit_params(self):
|
||||
"""Test basic auth via explicit constructor params."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert provider.registry_username == "myuser"
|
||||
assert provider.registry_password == "mypass"
|
||||
assert provider.auth_method == "Basic auth"
|
||||
|
||||
def test_token_auth_with_explicit_param(self):
|
||||
"""Test token auth via explicit constructor param."""
|
||||
provider = _make_provider(registry_token="my-token-123")
|
||||
|
||||
assert provider.registry_token == "my-token-123"
|
||||
assert provider.auth_method == "Registry token"
|
||||
|
||||
def test_basic_auth_takes_precedence_over_token(self):
|
||||
"""Test that username/password takes precedence over token."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
registry_token="my-token",
|
||||
)
|
||||
|
||||
assert provider.auth_method == "Basic auth"
|
||||
|
||||
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
|
||||
def test_basic_auth_from_env_vars(self):
|
||||
"""Test that env vars are used as fallback for basic auth."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider.registry_username == "envuser"
|
||||
assert provider.registry_password == "envpass"
|
||||
assert provider.auth_method == "Basic auth"
|
||||
|
||||
@patch.dict(os.environ, {"TRIVY_REGISTRY_TOKEN": "env-token"})
|
||||
def test_token_auth_from_env_var(self):
|
||||
"""Test that env var is used as fallback for token auth."""
|
||||
provider = _make_provider()
|
||||
|
||||
assert provider.registry_token == "env-token"
|
||||
assert provider.auth_method == "Registry token"
|
||||
|
||||
@patch.dict(os.environ, {"TRIVY_USERNAME": "envuser", "TRIVY_PASSWORD": "envpass"})
|
||||
def test_explicit_params_override_env_vars(self):
|
||||
"""Test that explicit params take precedence over env vars."""
|
||||
provider = _make_provider(
|
||||
registry_username="explicit",
|
||||
registry_password="explicit-pass",
|
||||
)
|
||||
|
||||
assert provider.registry_username == "explicit"
|
||||
assert provider.registry_password == "explicit-pass"
|
||||
|
||||
def test_build_trivy_env_no_auth(self):
|
||||
"""Test that _build_trivy_env returns base env when no auth."""
|
||||
provider = _make_provider()
|
||||
env = provider._build_trivy_env()
|
||||
|
||||
assert "TRIVY_USERNAME" not in env
|
||||
assert "TRIVY_PASSWORD" not in env
|
||||
assert "TRIVY_REGISTRY_TOKEN" not in env
|
||||
|
||||
def test_build_trivy_env_basic_auth(self):
|
||||
"""Test that _build_trivy_env injects username/password."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
env = provider._build_trivy_env()
|
||||
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
def test_build_trivy_env_token_auth(self):
|
||||
"""Test that _build_trivy_env injects registry token."""
|
||||
provider = _make_provider(registry_token="my-token")
|
||||
env = provider._build_trivy_env()
|
||||
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_execute_trivy_passes_env(self, mock_subprocess):
|
||||
"""Test that _execute_trivy passes credentials via env."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
stdout=get_sample_trivy_json_output(), stderr=""
|
||||
)
|
||||
|
||||
provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
|
||||
|
||||
call_kwargs = mock_subprocess.call_args
|
||||
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_with_basic_auth(self, mock_subprocess):
|
||||
"""Test test_connection passes credentials via env."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="private.registry.io/myapp:v1",
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
call_kwargs = mock_subprocess.call_args
|
||||
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
|
||||
assert env["TRIVY_USERNAME"] == "myuser"
|
||||
assert env["TRIVY_PASSWORD"] == "mypass"
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_test_connection_with_token(self, mock_subprocess):
|
||||
"""Test test_connection passes token via env."""
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
|
||||
|
||||
result = ImageProvider.test_connection(
|
||||
image="private.registry.io/myapp:v1",
|
||||
registry_token="my-token",
|
||||
)
|
||||
|
||||
assert result.is_connected is True
|
||||
call_kwargs = mock_subprocess.call_args
|
||||
env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
|
||||
assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
|
||||
|
||||
def test_print_credentials_shows_auth_method(self):
|
||||
"""Test that print_credentials outputs the auth method."""
|
||||
provider = _make_provider(
|
||||
registry_username="myuser",
|
||||
registry_password="mypass",
|
||||
)
|
||||
with mock.patch("builtins.print") as mock_print:
|
||||
provider.print_credentials()
|
||||
output = " ".join(
|
||||
str(call.args[0]) for call in mock_print.call_args_list if call.args
|
||||
)
|
||||
assert "Basic auth" in output
|
||||
Reference in New Issue
Block a user