diff --git a/docs/user-guide/providers/image/getting-started-image.mdx b/docs/user-guide/providers/image/getting-started-image.mdx
index 4fe509c2af..821478efa6 100644
--- a/docs/user-guide/providers/image/getting-started-image.mdx
+++ b/docs/user-guide/providers/image/getting-started-image.mdx
@@ -10,7 +10,7 @@ Prowler's Image provider enables comprehensive container image security scanning
* **Trivy integration:** Prowler leverages [Trivy](https://trivy.dev/) to scan container images for vulnerabilities, secrets, misconfigurations, and license issues.
* **Trivy required:** Trivy must be installed and available in the system PATH before running any scan.
-* **Authentication:** No registry authentication is required for public images. For private registries, configure Docker credentials via `docker login` before scanning.
+* **Authentication:** No registry authentication is required for public images. For private registries, credentials can be provided via environment variables or manual `docker login`.
* **Output formats:** Results are output in the same formats as other Prowler providers (CSV, JSON, HTML, etc.).
## Prowler CLI
@@ -173,25 +173,147 @@ prowler image -I large-image:latest --timeout 10m
The timeout accepts values in seconds (`s`), minutes (`m`), or hours (`h`). Default: `5m`.
-### Authentication for Private Registries
+### Registry Scan Mode
-The Image provider relies on Trivy for registry authentication. To scan images from private registries, configure Docker credentials before running the scan:
+Registry Scan Mode enumerates and scans all images from an OCI-compatible registry, Docker Hub namespace, or Amazon ECR registry. To activate it, use the `--registry` flag with the registry URL:
```bash
-# Log in to a private registry
-docker login myregistry.io
+prowler image --registry myregistry.io
+```
+
+#### Discover Available Images
+
+To list all repositories and tags available in the registry without running a scan, use the `--registry-list` flag. This is useful for discovering image names and tags before building filter regexes:
+
+```bash
+prowler image --registry myregistry.io --registry-list
+```
+
+Example output:
+
+```text
+Registry: myregistry.io (3 repositories, 8 images)
+
+ api-service (2 tags)
+ latest, v3.1
+ hub-scanner (3 tags)
+ latest, v1.0, v2.0
+ web-frontend (3 tags)
+ latest, v1.0, v2.0
+```
+
+Filters can be combined with `--registry-list` to preview the results before scanning:
+
+```bash
+prowler image --registry myregistry.io --registry-list --image-filter "api.*"
+```
+
+#### Filter Repositories
+
+To filter repositories by name during enumeration, use the `--image-filter` flag with a Python regex pattern (matched via `re.search`):
+
+```bash
+# Scan only repositories starting with "prod/"
+prowler image --registry myregistry.io --image-filter "^prod/"
+```
+
+#### Filter Tags
+
+To filter tags during enumeration, use the `--tag-filter` flag with a Python regex pattern:
+
+```bash
+# Scan only semantic version tags
+prowler image --registry myregistry.io --tag-filter "^v\d+\.\d+\.\d+$"
+```
+
+Both filters can be combined:
+
+```bash
+prowler image --registry myregistry.io --image-filter "^prod/" --tag-filter "^(latest|v\d+)"
+```
+
+#### Limit the Number of Images
+
+To prevent accidentally scanning a large number of images, use the `--max-images` flag. The scan aborts if the discovered image count exceeds the limit:
+
+```bash
+prowler image --registry myregistry.io --max-images 10
+```
+
+Setting `--max-images` to `0` (default) disables the limit.
+
+
+When `--registry-list` is active, the `--max-images` limit is not enforced because no scan is performed.
+
+
+#### Skip TLS Verification
+
+To connect to registries with self-signed certificates, use the `--registry-insecure` flag:
+
+```bash
+prowler image --registry internal-registry.local --registry-insecure
+```
+
+
+Skipping TLS verification disables certificate validation for registry connections. Use this flag only for trusted internal registries with self-signed certificates.
+
+
+#### Supported Registries
+
+Registry Scan Mode supports the following registry types:
+
+* **OCI-compatible registries:** Any registry implementing the OCI Distribution Specification (e.g., Harbor, GitLab Container Registry, GitHub Container Registry).
+* **Docker Hub:** Specify a namespace with `--registry docker.io/{org_or_user}`. Public namespaces can be scanned without credentials; authenticated access is used automatically when `REGISTRY_USERNAME` and `REGISTRY_PASSWORD` are set.
+* **Amazon ECR:** Use the full ECR endpoint URL (e.g., `123456789.dkr.ecr.us-east-1.amazonaws.com`). Authentication is handled via AWS credentials.
+
+### Authentication for Private Registries
+
+To scan images from private registries, the Image provider supports three authentication methods. Prowler uses the first available method in this priority order:
+
+#### 1. Basic Authentication (Environment Variables)
+
+To authenticate with a username and password, set the `REGISTRY_USERNAME` and `REGISTRY_PASSWORD` environment variables. Prowler automatically runs `docker login`, pulls the image, and performs a `docker logout` after the scan completes:
+
+```bash
+export REGISTRY_USERNAME="myuser"
+export REGISTRY_PASSWORD="mypassword"
-# Then scan the image
prowler image -I myregistry.io/myapp:v1.0
```
-Trivy automatically uses credentials from Docker's credential store (`~/.docker/config.json`).
+Both variables must be set for this method to activate. Prowler handles the full lifecycle — login, pull, scan, and cleanup — without any manual Docker commands.
+
+#### 2. Token-Based Authentication
+
+To authenticate using a registry token (such as a bearer or OAuth2 token), set the `REGISTRY_TOKEN` environment variable. Prowler passes the token directly to Trivy:
+
+```bash
+export REGISTRY_TOKEN="my-registry-token"
+
+prowler image -I myregistry.io/myapp:v1.0
+```
+
+This method is useful for registries that support token-based access without requiring a username and password.
+
+#### 3. Manual Docker Login (Fallback)
+
+If no environment variables are set, Prowler relies on existing credentials in Docker's credential store (`~/.docker/config.json`). To configure credentials manually before scanning:
+
+```bash
+docker login myregistry.io
+
+prowler image -I myregistry.io/myapp:v1.0
+```
+
+
+When basic authentication is active (method 1), Prowler automatically logs out from all authenticated registries after the scan completes. Manual `docker login` sessions (method 3) are not affected by this cleanup.
+
### Troubleshooting Common Scan Errors
The Image provider categorizes common Trivy errors with actionable guidance:
-* **Authentication failure (401/403):** Registry credentials are missing or invalid. Run `docker login` for the target registry and retry the scan.
+* **Authentication failure (401/403):** Registry credentials are missing or invalid. Verify the `REGISTRY_USERNAME`/`REGISTRY_PASSWORD` or `REGISTRY_TOKEN` environment variables, or run `docker login` for the target registry and retry the scan.
* **Image not found (404):** The specified image name, tag, or registry is incorrect. Verify the image reference exists and is accessible.
* **Rate limited (429):** The container registry is throttling requests. Wait before retrying, or authenticate to increase rate limits.
* **Network issue:** Trivy cannot reach the registry due to connectivity problems. Check network access, DNS resolution, and firewall rules.
diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md
index 4bfea57e81..a372096834 100644
--- a/prowler/CHANGELOG.md
+++ b/prowler/CHANGELOG.md
@@ -20,6 +20,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- OpenStack compute 7 new checks [(#9944)](https://github.com/prowler-cloud/prowler/pull/9944)
- CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061)
- ECS Exec (ECS-006) privilege escalation detection via `ecs:ExecuteCommand` + `ecs:DescribeTasks` [(#10066)](https://github.com/prowler-cloud/prowler/pull/10066)
+- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)
### 🔄 Changed
diff --git a/prowler/providers/common/provider.py b/prowler/providers/common/provider.py
index 6f29c0b800..e43fa71c7d 100644
--- a/prowler/providers/common/provider.py
+++ b/prowler/providers/common/provider.py
@@ -285,6 +285,19 @@ class Provider(ABC):
timeout=arguments.timeout,
config_path=arguments.config_file,
fixer_config=fixer_config,
+ registry_username=getattr(arguments, "registry_username", None),
+ registry_password=getattr(arguments, "registry_password", None),
+ registry_token=getattr(arguments, "registry_token", None),
+ registry=getattr(arguments, "registry", None),
+ image_filter=getattr(arguments, "image_filter", None),
+ tag_filter=getattr(arguments, "tag_filter", None),
+ max_images=getattr(arguments, "max_images", 0),
+ registry_insecure=getattr(
+ arguments, "registry_insecure", False
+ ),
+ registry_list_images=getattr(
+ arguments, "registry_list_images", False
+ ),
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(
diff --git a/prowler/providers/image/exceptions/exceptions.py b/prowler/providers/image/exceptions/exceptions.py
index 0e115a83ec..387b443ce3 100644
--- a/prowler/providers/image/exceptions/exceptions.py
+++ b/prowler/providers/image/exceptions/exceptions.py
@@ -50,12 +50,32 @@ class ImageBaseException(ProwlerException):
"message": "Invalid image config scanner type.",
"remediation": "Use valid image config scanners: misconfig, secret.",
},
+ (11013, "ImageRegistryAuthError"): {
+ "message": "Registry authentication failed.",
+ "remediation": "Check REGISTRY_USERNAME/REGISTRY_PASSWORD or REGISTRY_TOKEN environment variables.",
+ },
+ (11014, "ImageRegistryCatalogError"): {
+ "message": "Registry does not support catalog listing.",
+ "remediation": "Use --image or --image-list instead of --registry.",
+ },
+ (11015, "ImageRegistryNetworkError"): {
+ "message": "Network error communicating with registry.",
+ "remediation": "Check registry URL and network connectivity.",
+ },
+ (11016, "ImageMaxImagesExceededError"): {
+ "message": "Discovered images exceed --max-images limit.",
+ "remediation": "Use --image-filter or --tag-filter to narrow results, or increase --max-images.",
+ },
+ (11017, "ImageInvalidFilterError"): {
+ "message": "Invalid regex filter pattern.",
+ "remediation": "Check the regex syntax for --image-filter or --tag-filter.",
+ },
}
def __init__(self, code, file=None, original_exception=None, message=None):
error_info = self.IMAGE_ERROR_CODES.get((code, self.__class__.__name__))
- if message:
- error_info["message"] = message
+ if error_info and message:
+ error_info = {**error_info, "message": message}
super().__init__(
code,
source="Image",
@@ -162,3 +182,48 @@ class ImageInvalidConfigScannerError(ImageBaseException):
super().__init__(
11010, file=file, original_exception=original_exception, message=message
)
+
+
+class ImageRegistryAuthError(ImageBaseException):
+ """Exception raised when registry authentication fails."""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ 11013, file=file, original_exception=original_exception, message=message
+ )
+
+
+class ImageRegistryCatalogError(ImageBaseException):
+ """Exception raised when registry does not support catalog listing."""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ 11014, file=file, original_exception=original_exception, message=message
+ )
+
+
+class ImageRegistryNetworkError(ImageBaseException):
+ """Exception raised when a network error occurs communicating with a registry."""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ 11015, file=file, original_exception=original_exception, message=message
+ )
+
+
+class ImageMaxImagesExceededError(ImageBaseException):
+ """Exception raised when discovered images exceed --max-images limit."""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ 11016, file=file, original_exception=original_exception, message=message
+ )
+
+
+class ImageInvalidFilterError(ImageBaseException):
+ """Exception raised when an invalid regex filter pattern is provided."""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ 11017, file=file, original_exception=original_exception, message=message
+ )
diff --git a/prowler/providers/image/image_provider.py b/prowler/providers/image/image_provider.py
index 1e11651253..15f4e14093 100644
--- a/prowler/providers/image/image_provider.py
+++ b/prowler/providers/image/image_provider.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import json
+import os
import re
import subprocess
import sys
@@ -21,12 +22,14 @@ from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageFindingProcessingError,
ImageInvalidConfigScannerError,
+ ImageInvalidFilterError,
ImageInvalidNameError,
ImageInvalidScannerError,
ImageInvalidSeverityError,
ImageInvalidTimeoutError,
ImageListFileNotFoundError,
ImageListFileReadError,
+ ImageMaxImagesExceededError,
ImageNoImagesProvidedError,
ImageScanError,
ImageTrivyBinaryNotFoundError,
@@ -36,6 +39,8 @@ from prowler.providers.image.lib.arguments.arguments import (
SCANNERS_CHOICES,
SEVERITY_CHOICES,
)
+from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
+from prowler.providers.image.lib.registry.factory import create_registry_adapter
class ImageProvider(Provider):
@@ -66,6 +71,15 @@ class ImageProvider(Provider):
config_path: str | None = None,
config_content: dict | None = None,
fixer_config: dict | None = None,
+ registry_username: str | None = None,
+ registry_password: str | None = None,
+ registry_token: str | None = None,
+ registry: str | None = None,
+ image_filter: str | None = None,
+ tag_filter: str | None = None,
+ max_images: int = 0,
+ registry_insecure: bool = False,
+ registry_list_images: bool = False,
):
logger.info("Instantiating Image Provider...")
@@ -82,7 +96,52 @@ class ImageProvider(Provider):
self.audited_account = "image-scan"
self._session = None
self._identity = "prowler"
- self._auth_method = "No auth"
+
+ # Registry authentication (follows IaC pattern: explicit params, env vars internal)
+ self.registry_username = registry_username or os.environ.get(
+ "REGISTRY_USERNAME"
+ )
+ self.registry_password = registry_password or os.environ.get(
+ "REGISTRY_PASSWORD"
+ )
+ self.registry_token = registry_token or os.environ.get("REGISTRY_TOKEN")
+
+ if self.registry_username and self.registry_password:
+ self._auth_method = "Basic auth"
+ logger.info("Using basic auth for registry authentication")
+ elif self.registry_token:
+ self._auth_method = "Registry token"
+ logger.info("Using registry token for authentication")
+ else:
+ self._auth_method = "No auth"
+
+ # Registry scan mode
+ self.registry = registry
+ self.image_filter = image_filter
+ self.tag_filter = tag_filter
+ self.max_images = max_images
+ self.registry_insecure = registry_insecure
+ self.registry_list_images = registry_list_images
+
+ # Compile regex filters
+ self._image_filter_re = None
+ self._tag_filter_re = None
+ if self.image_filter:
+ try:
+ self._image_filter_re = re.compile(self.image_filter)
+ except re.error as exc:
+ raise ImageInvalidFilterError(
+ file=__file__,
+ message=f"Invalid --image-filter regex '{self.image_filter}': {exc}",
+ )
+ if self.tag_filter:
+ try:
+ self._tag_filter_re = re.compile(self.tag_filter)
+ except re.error as exc:
+ raise ImageInvalidFilterError(
+ file=__file__,
+ message=f"Invalid --tag-filter regex '{self.tag_filter}': {exc}",
+ )
self._validate_inputs()
@@ -90,6 +149,10 @@ class ImageProvider(Provider):
if image_list_file:
self._load_images_from_file(image_list_file)
+ # Registry scan mode: enumerate images from registry
+ if self.registry:
+ self._enumerate_registry()
+
for image in self.images:
self._validate_image_name(image)
@@ -245,6 +308,20 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
+ @staticmethod
+ def _extract_registry(image: str) -> str | None:
+ """Extract registry hostname from an image reference.
+
+ Returns None for Docker Hub images (no registry prefix).
+ """
+ parts = image.split("/")
+ if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
+ return parts[0]
+ return None
+
+ def cleanup(self) -> None:
+ """Clean up any resources after scanning."""
+
def _process_finding(
self, finding: dict, image_name: str, finding_type: str
) -> CheckReportImage:
@@ -368,10 +445,13 @@ class ImageProvider(Provider):
def run(self) -> list[CheckReportImage]:
"""Execute the container image scan."""
- reports = []
- for batch in self.run_scan():
- reports.extend(batch)
- return reports
+ try:
+ reports = []
+ for batch in self.run_scan():
+ reports.extend(batch)
+ return reports
+ finally:
+ self.cleanup()
def run_scan(self) -> Generator[list[CheckReportImage], None, None]:
"""
@@ -507,8 +587,19 @@ class ImageProvider(Provider):
)
logger.error(f"Error scanning image {image}: {error}")
+ def _build_trivy_env(self) -> dict:
+ """Build environment variables for Trivy, injecting registry credentials."""
+ env = dict(os.environ)
+ if self.registry_username and self.registry_password:
+ env["TRIVY_USERNAME"] = self.registry_username
+ env["TRIVY_PASSWORD"] = self.registry_password
+ elif self.registry_token:
+ env["TRIVY_REGISTRY_TOKEN"] = self.registry_token
+ return env
+
def _execute_trivy(self, command: list, image: str) -> subprocess.CompletedProcess:
"""Execute Trivy command with optional progress bar."""
+ env = self._build_trivy_env()
try:
if sys.stdout.isatty():
with alive_bar(
@@ -523,6 +614,7 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
+ env=env,
)
bar.title = f"-> Scan completed for {image}"
return process
@@ -532,12 +624,13 @@ class ImageProvider(Provider):
command,
capture_output=True,
text=True,
+ env=env,
)
logger.info(f"Scan completed for {image}")
return process
except (AttributeError, OSError):
logger.info(f"Scanning {image}...")
- return subprocess.run(command, capture_output=True, text=True)
+ return subprocess.run(command, capture_output=True, text=True, env=env)
def _log_trivy_stderr(self, stderr: str) -> None:
"""Parse and log Trivy's stderr output."""
@@ -586,7 +679,7 @@ class ImageProvider(Provider):
lower = error_msg.lower()
if any(kw in lower for kw in ("401", "403", "unauthorized", "denied")):
- return f"Auth failure — check `docker login`: {error_msg}"
+ return f"Auth failure — check registry credentials: {error_msg}"
if any(kw in lower for kw in ("404", "manifest unknown", "not found")):
return f"Image not found — check name/tag/registry: {error_msg}"
if any(kw in lower for kw in ("429", "rate limit", "too many requests")):
@@ -596,6 +689,104 @@ class ImageProvider(Provider):
return error_msg
+ def _enumerate_registry(self) -> None:
+ """Enumerate images from a registry using the appropriate adapter."""
+ verify_ssl = not self.registry_insecure
+ adapter = create_registry_adapter(
+ registry_url=self.registry,
+ username=self.registry_username,
+ password=self.registry_password,
+ token=self.registry_token,
+ verify_ssl=verify_ssl,
+ )
+
+ repositories = adapter.list_repositories()
+ logger.info(
+ f"Discovered {len(repositories)} repositories from registry {self.registry}"
+ )
+
+ # Apply image filter
+ if self._image_filter_re:
+ repositories = [r for r in repositories if self._image_filter_re.search(r)]
+ logger.info(
+ f"{len(repositories)} repositories match --image-filter '{self.image_filter}'"
+ )
+
+ if not repositories:
+ logger.warning(
+ f"No repositories found in registry {self.registry} (after filtering)"
+ )
+ return
+
+ # Determine if this is a Docker Hub adapter (for image reference format)
+ is_dockerhub = isinstance(adapter, DockerHubAdapter)
+
+ discovered_images = []
+ repos_tags: dict[str, list[str]] = {}
+ for repo in repositories:
+ tags = adapter.list_tags(repo)
+
+ # Apply tag filter
+ if self._tag_filter_re:
+ tags = [t for t in tags if self._tag_filter_re.search(t)]
+
+ if tags:
+ repos_tags[repo] = tags
+
+ for tag in tags:
+ if is_dockerhub:
+ # Docker Hub images don't need a host prefix
+ image_ref = f"{repo}:{tag}"
+ else:
+ # OCI registries need the full host/repo:tag reference
+ registry_host = self.registry.rstrip("/")
+ for prefix in ("https://", "http://"):
+ if registry_host.startswith(prefix):
+ registry_host = registry_host[len(prefix) :]
+ break
+ image_ref = f"{registry_host}/{repo}:{tag}"
+ discovered_images.append(image_ref)
+
+ # Registry list mode: print listing and exit
+ if self.registry_list_images:
+ self._print_registry_listing(repos_tags, len(discovered_images))
+ raise SystemExit(0)
+
+ # Check max-images limit
+ if self.max_images and len(discovered_images) > self.max_images:
+ raise ImageMaxImagesExceededError(
+ file=__file__,
+ message=f"Discovered {len(discovered_images)} images, exceeding --max-images {self.max_images}. Use --image-filter or --tag-filter to narrow results.",
+ )
+
+ # Deduplicate with explicit images
+ existing = set(self.images)
+ for img in discovered_images:
+ if img not in existing:
+ self.images.append(img)
+ existing.add(img)
+
+ logger.info(
+ f"Discovered {len(discovered_images)} images from registry {self.registry} "
+ f"({len(repositories)} repositories). Total images to scan: {len(self.images)}"
+ )
+
+ def _print_registry_listing(
+ self, repos_tags: dict[str, list[str]], total_images: int
+ ) -> None:
+ """Print a structured listing of registry repositories and tags."""
+ num_repos = len(repos_tags)
+ print(
+ f"\n{Style.BRIGHT}Registry:{Style.RESET_ALL} "
+ f"{Fore.CYAN}{self.registry}{Style.RESET_ALL} "
+ f"({num_repos} {'repository' if num_repos == 1 else 'repositories'}, "
+ f"{total_images} {'image' if total_images == 1 else 'images'})\n"
+ )
+ for repo, tags in repos_tags.items():
+ print(f" {Fore.YELLOW}{repo}{Style.RESET_ALL} " f"({len(tags)} tags)")
+ print(f" {', '.join(tags)}")
+ print()
+
def print_credentials(self) -> None:
"""Print scan configuration."""
report_title = f"{Style.BRIGHT}Scanning container images:{Style.RESET_ALL}"
@@ -628,6 +819,23 @@ class ImageProvider(Provider):
report_lines.append(f"Timeout: {Fore.YELLOW}{self.timeout}{Style.RESET_ALL}")
+ report_lines.append(
+ f"Authentication method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}"
+ )
+
+ if self.registry:
+ report_lines.append(
+ f"Registry: {Fore.YELLOW}{self.registry}{Style.RESET_ALL}"
+ )
+ if self.image_filter:
+ report_lines.append(
+ f"Image filter: {Fore.YELLOW}{self.image_filter}{Style.RESET_ALL}"
+ )
+ if self.tag_filter:
+ report_lines.append(
+ f"Tag filter: {Fore.YELLOW}{self.tag_filter}{Style.RESET_ALL}"
+ )
+
print_boxes(report_lines, report_title)
@staticmethod
@@ -635,6 +843,9 @@ class ImageProvider(Provider):
image: str | None = None,
raise_on_exception: bool = True,
provider_id: str | None = None,
+ registry_username: str | None = None,
+ registry_password: str | None = None,
+ registry_token: str | None = None,
) -> "Connection":
"""
Test connection to container registry by attempting to inspect an image.
@@ -643,6 +854,9 @@ class ImageProvider(Provider):
image: Container image to test
raise_on_exception: Whether to raise exceptions
provider_id: Fallback for image name
+ registry_username: Registry username for basic auth
+ registry_password: Registry password for basic auth
+ registry_token: Registry token for token-based auth
Returns:
Connection: Connection object with success status
@@ -654,6 +868,14 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
+ # Build env with registry credentials
+ env = dict(os.environ)
+ if registry_username and registry_password:
+ env["TRIVY_USERNAME"] = registry_username
+ env["TRIVY_PASSWORD"] = registry_password
+ elif registry_token:
+ env["TRIVY_REGISTRY_TOKEN"] = registry_token
+
# Test by running trivy with --skip-update to just test image access
process = subprocess.run(
[
@@ -666,6 +888,7 @@ class ImageProvider(Provider):
capture_output=True,
text=True,
timeout=60,
+ env=env,
)
if process.returncode == 0:
diff --git a/prowler/providers/image/lib/arguments/arguments.py b/prowler/providers/image/lib/arguments/arguments.py
index 3809aec667..3dfe9cf92b 100644
--- a/prowler/providers/image/lib/arguments/arguments.py
+++ b/prowler/providers/image/lib/arguments/arguments.py
@@ -88,16 +88,96 @@ def init_parser(self):
help="Trivy scan timeout. Default: 5m. Examples: 10m, 1h",
)
+ # Registry Scan Mode
+ registry_group = image_parser.add_argument_group("Registry Scan Mode")
+ registry_group.add_argument(
+ "--registry",
+ dest="registry",
+ default=None,
+ help="Registry URL to enumerate and scan all images. Examples: myregistry.io, docker.io/myorg, 123456789.dkr.ecr.us-east-1.amazonaws.com",
+ )
+ registry_group.add_argument(
+ "--image-filter",
+ dest="image_filter",
+ default=None,
+ help="Regex to filter repository names during registry enumeration (re.search). Example: '^prod/.*'",
+ )
+ registry_group.add_argument(
+ "--tag-filter",
+ dest="tag_filter",
+ default=None,
+ help=r"Regex to filter tags during registry enumeration (re.search). Example: '^(latest|v\d+\.\d+\.\d+)$'",
+ )
+ registry_group.add_argument(
+ "--max-images",
+ dest="max_images",
+ type=int,
+ default=0,
+ help="Maximum number of images to scan from registry. 0 = unlimited. Aborts if exceeded.",
+ )
+ registry_group.add_argument(
+ "--registry-insecure",
+ dest="registry_insecure",
+ action="store_true",
+ default=False,
+ help="Skip TLS verification for registry connections (for self-signed certificates).",
+ )
+ registry_group.add_argument(
+ "--registry-list",
+ dest="registry_list_images",
+ action="store_true",
+ default=False,
+ help="List all repositories and tags from the registry, then exit without scanning. Useful for discovering available images before building --image-filter or --tag-filter.",
+ )
+
def validate_arguments(arguments):
"""Validate Image provider arguments."""
images = getattr(arguments, "images", [])
image_list_file = getattr(arguments, "image_list_file", None)
+ registry = getattr(arguments, "registry", None)
+ image_filter = getattr(arguments, "image_filter", None)
+ tag_filter = getattr(arguments, "tag_filter", None)
+ max_images = getattr(arguments, "max_images", 0)
+ registry_insecure = getattr(arguments, "registry_insecure", False)
+ registry_list_images = getattr(arguments, "registry_list_images", False)
- if not images and not image_list_file:
+ if registry_list_images and not registry:
+ return (False, "--registry-list requires --registry.")
+
+ if not images and not image_list_file and not registry:
return (
False,
- "At least one image must be specified using --image (-I) or --image-list.",
+ "At least one image source must be specified using --image (-I), --image-list, or --registry.",
)
+ # Registry-only flags require --registry
+ if not registry:
+ if image_filter:
+ return (False, "--image-filter requires --registry.")
+ if tag_filter:
+ return (False, "--tag-filter requires --registry.")
+ if max_images:
+ return (False, "--max-images requires --registry.")
+ if registry_insecure:
+ return (False, "--registry-insecure requires --registry.")
+
+ # Docker Hub namespace validation
+ if registry:
+ url = registry.rstrip("/")
+ for prefix in ("https://", "http://"):
+ if url.startswith(prefix):
+ url = url[len(prefix) :]
+ break
+ stripped = url
+ for prefix in ("registry-1.docker.io", "docker.io"):
+ if stripped.startswith(prefix):
+ stripped = stripped[len(prefix) :].lstrip("/")
+ if not stripped:
+ return (
+ False,
+ "Docker Hub requires a namespace. Use --registry docker.io/{org_or_user}.",
+ )
+ break
+
return (True, "")
diff --git a/prowler/providers/image/lib/registry/__init__.py b/prowler/providers/image/lib/registry/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/prowler/providers/image/lib/registry/base.py b/prowler/providers/image/lib/registry/base.py
new file mode 100644
index 0000000000..2e07fcde93
--- /dev/null
+++ b/prowler/providers/image/lib/registry/base.py
@@ -0,0 +1,124 @@
+"""Registry adapter abstract base class."""
+
+from __future__ import annotations
+
+import re
+import time
+from abc import ABC, abstractmethod
+from urllib.parse import urlparse
+
+import requests
+
+from prowler.lib.logger import logger
+from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
+
+_MAX_RETRIES = 3
+_BACKOFF_BASE = 1
+
+
+class RegistryAdapter(ABC):
+ """Abstract base class for registry adapters."""
+
+ def __init__(
+ self,
+ registry_url: str,
+ username: str | None = None,
+ password: str | None = None,
+ token: str | None = None,
+ verify_ssl: bool = True,
+ ) -> None:
+ self.registry_url = registry_url
+ self.username = username
+ self._password = password
+ self._token = token
+ self.verify_ssl = verify_ssl
+
+ @property
+ def password(self) -> str | None:
+ return self._password
+
+ @property
+ def token(self) -> str | None:
+ return self._token
+
+ def __getstate__(self) -> dict:
+ state = self.__dict__.copy()
+ state["_password"] = "***" if state.get("_password") else None
+ state["_token"] = "***" if state.get("_token") else None
+ return state
+
+ def __repr__(self) -> str:
+ return (
+ f"{self.__class__.__name__}("
+ f"registry_url={self.registry_url!r}, "
+ f"username={self.username!r}, "
+ f"password={'' if self._password else None}, "
+ f"token={'' if self._token else None})"
+ )
+
+ @abstractmethod
+ def list_repositories(self) -> list[str]:
+ """Enumerate all repository names in the registry."""
+ ...
+
+ @abstractmethod
+ def list_tags(self, repository: str) -> list[str]:
+ """Enumerate all tags for a repository."""
+ ...
+
+ def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
+ context_label = kwargs.pop("context_label", None) or self.registry_url
+ kwargs.setdefault("timeout", 30)
+ kwargs.setdefault("verify", self.verify_ssl)
+ last_exception = None
+ last_status = None
+ for attempt in range(1, _MAX_RETRIES + 1):
+ try:
+ resp = requests.request(method, url, **kwargs)
+ if resp.status_code == 429:
+ last_status = 429
+ wait = _BACKOFF_BASE * (2 ** (attempt - 1))
+ logger.warning(
+ f"Rate limited by {context_label}, retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES})"
+ )
+ time.sleep(wait)
+ continue
+ return resp
+ except requests.exceptions.ConnectionError as exc:
+ last_exception = exc
+ if attempt < _MAX_RETRIES:
+ wait = _BACKOFF_BASE * (2 ** (attempt - 1))
+ logger.warning(
+ f"Connection error to {context_label}, retrying in {wait}s (attempt {attempt}/{_MAX_RETRIES})"
+ )
+ time.sleep(wait)
+ continue
+ except requests.exceptions.Timeout as exc:
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Connection timed out to {context_label}.",
+ original_exception=exc,
+ )
+ if last_status == 429:
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Rate limited by {context_label} after {_MAX_RETRIES} attempts.",
+ )
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Failed to connect to {context_label} after {_MAX_RETRIES} attempts.",
+ original_exception=last_exception,
+ )
+
+ def _next_page_url(self, resp: requests.Response) -> str | None:
+ link_header = resp.headers.get("Link", "")
+ if not link_header:
+ return None
+ match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
+ if not match:
+ return None
+ url = match.group(1)
+ if url.startswith("/"):
+ parsed = urlparse(resp.url)
+ return f"{parsed.scheme}://{parsed.netloc}{url}"
+ return url
diff --git a/prowler/providers/image/lib/registry/dockerhub_adapter.py b/prowler/providers/image/lib/registry/dockerhub_adapter.py
new file mode 100644
index 0000000000..6b3df09818
--- /dev/null
+++ b/prowler/providers/image/lib/registry/dockerhub_adapter.py
@@ -0,0 +1,215 @@
+"""Docker Hub registry adapter."""
+
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+
+from prowler.lib.logger import logger
+from prowler.providers.image.exceptions.exceptions import (
+ ImageRegistryAuthError,
+ ImageRegistryCatalogError,
+ ImageRegistryNetworkError,
+)
+from prowler.providers.image.lib.registry.base import RegistryAdapter
+
+if TYPE_CHECKING:
+ import requests
+
+_HUB_API = "https://hub.docker.com"
+_REGISTRY_HOST = "https://registry-1.docker.io"
+_AUTH_URL = "https://auth.docker.io/token"
+
+
+class DockerHubAdapter(RegistryAdapter):
+ """Adapter for Docker Hub using the Hub REST API + OCI tag listing."""
+
+ def __init__(
+ self,
+ registry_url: str,
+ username: str | None = None,
+ password: str | None = None,
+ token: str | None = None,
+ verify_ssl: bool = True,
+ ) -> None:
+ if not verify_ssl:
+ logger.warning(
+ "Docker Hub always uses TLS verification; --registry-insecure is ignored for Docker Hub registries."
+ )
+ super().__init__(registry_url, username, password, token, verify_ssl=True)
+ self.namespace = self._extract_namespace(registry_url)
+ self._hub_jwt: str | None = None
+ self._registry_tokens: dict[str, str] = {}
+
+ @staticmethod
+ def _extract_namespace(registry_url: str) -> str:
+ url = registry_url.rstrip("/")
+ for prefix in (
+ "https://registry-1.docker.io",
+ "http://registry-1.docker.io",
+ "https://docker.io",
+ "http://docker.io",
+ "registry-1.docker.io",
+ "docker.io",
+ "https://",
+ "http://",
+ ):
+ if url.startswith(prefix):
+ url = url[len(prefix) :]
+ break
+ url = url.lstrip("/")
+ parts = url.split("/")
+ namespace = parts[0] if parts and parts[0] else ""
+ return namespace
+
+ def list_repositories(self) -> list[str]:
+ if not self.namespace:
+ raise ImageRegistryCatalogError(
+ file=__file__,
+ message="Docker Hub requires a namespace. Use --registry docker.io/{org_or_user}.",
+ )
+ self._hub_login()
+ repositories: list[str] = []
+ if self._hub_jwt:
+ url = f"{_HUB_API}/v2/namespaces/{self.namespace}/repositories"
+ else:
+ url = f"{_HUB_API}/v2/repositories/{self.namespace}/"
+ params: dict = {"page_size": 100}
+ while url:
+ resp = self._hub_request("GET", url, params=params)
+ self._check_hub_response(resp, "repository listing")
+ data = resp.json()
+ for repo in data.get("results", []):
+ name = repo.get("name", "")
+ if name:
+ repositories.append(f"{self.namespace}/{name}")
+ url = data.get("next")
+ params = {}
+ return repositories
+
+ def list_tags(self, repository: str) -> list[str]:
+ token = self._get_registry_token(repository)
+ tags: list[str] = []
+ url = f"{_REGISTRY_HOST}/v2/{repository}/tags/list"
+ params: dict = {"n": 100}
+ while url:
+ resp = self._registry_request("GET", url, token, params=params)
+ if resp.status_code in (401, 403):
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Authentication failed for tag listing of {repository} on Docker Hub. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ if resp.status_code != 200:
+ logger.warning(
+ f"Failed to list tags for {repository} (HTTP {resp.status_code}): {resp.text[:200]}"
+ )
+ break
+ data = resp.json()
+ tags.extend(data.get("tags", []) or [])
+ url = self._next_tag_page_url(resp)
+ params = {}
+ return tags
+
+ def _hub_login(self) -> None:
+ if self._hub_jwt:
+ return
+ if not self.username or not self.password:
+ return
+ resp = self._request_with_retry(
+ "POST",
+ f"{_HUB_API}/v2/users/login",
+ json={"username": self.username, "password": self.password},
+ context_label="Docker Hub",
+ )
+ if resp.status_code != 200:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Docker Hub login failed (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ self._hub_jwt = resp.json().get("token")
+ if not self._hub_jwt:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message="Docker Hub login returned an empty JWT token. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+
+ def _get_registry_token(self, repository: str) -> str:
+ if repository in self._registry_tokens:
+ return self._registry_tokens[repository]
+ params = {
+ "service": "registry.docker.io",
+ "scope": f"repository:{repository}:pull",
+ }
+ auth = None
+ if self.username and self.password:
+ auth = (self.username, self.password)
+ resp = self._request_with_retry(
+ "GET",
+ _AUTH_URL,
+ params=params,
+ auth=auth,
+ context_label="Docker Hub",
+ )
+ if resp.status_code != 200:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Failed to obtain Docker Hub registry token for {repository} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ token = resp.json().get("token", "")
+ if not token:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Docker Hub registry token endpoint returned an empty token for {repository}. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ self._registry_tokens[repository] = token
+ return token
+
+ def _hub_request(self, method: str, url: str, **kwargs) -> requests.Response:
+ headers = kwargs.pop("headers", {})
+ if self._hub_jwt:
+ headers["Authorization"] = f"Bearer {self._hub_jwt}"
+ kwargs["headers"] = headers
+ return self._request_with_retry(
+ method, url, context_label="Docker Hub", **kwargs
+ )
+
+ def _registry_request(
+ self, method: str, url: str, token: str, **kwargs
+ ) -> requests.Response:
+ headers = kwargs.pop("headers", {})
+ headers["Authorization"] = f"Bearer {token}"
+ kwargs["headers"] = headers
+ return self._request_with_retry(
+ method, url, context_label="Docker Hub", **kwargs
+ )
+
+ def _check_hub_response(self, resp: requests.Response, context: str) -> None:
+ if resp.status_code == 200:
+ return
+ if resp.status_code in (401, 403):
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Authentication failed for {context} on Docker Hub (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD environment variables.",
+ )
+ if resp.status_code == 404:
+ raise ImageRegistryCatalogError(
+ file=__file__,
+ message=f"Namespace '{self.namespace}' not found on Docker Hub. Check the namespace in --registry docker.io/{{namespace}}.",
+ )
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Unexpected error during {context} on Docker Hub (HTTP {resp.status_code}): {resp.text[:200]}",
+ )
+
+ @staticmethod
+ def _next_tag_page_url(resp: requests.Response) -> str | None:
+ link_header = resp.headers.get("Link", "")
+ if not link_header:
+ return None
+ match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
+ if match:
+ next_url = match.group(1)
+ if next_url.startswith("/"):
+ return f"{_REGISTRY_HOST}{next_url}"
+ return next_url
+ return None
diff --git a/prowler/providers/image/lib/registry/factory.py b/prowler/providers/image/lib/registry/factory.py
new file mode 100644
index 0000000000..5c0134fedd
--- /dev/null
+++ b/prowler/providers/image/lib/registry/factory.py
@@ -0,0 +1,40 @@
+"""Factory for auto-detecting registry type and returning the appropriate adapter."""
+
+from __future__ import annotations
+
+import re
+
+from prowler.providers.image.lib.registry.base import RegistryAdapter
+from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
+from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
+
+_DOCKER_HUB_PATTERN = re.compile(
+ r"^(https?://)?(docker\.io|registry-1\.docker\.io)(/|$)", re.IGNORECASE
+)
+
+
+def create_registry_adapter(
+ registry_url: str,
+ username: str | None = None,
+ password: str | None = None,
+ token: str | None = None,
+ verify_ssl: bool = True,
+) -> RegistryAdapter:
+ """Auto-detect registry type from URL and return the appropriate adapter."""
+ if _DOCKER_HUB_PATTERN.search(registry_url):
+ return DockerHubAdapter(
+ registry_url=registry_url,
+ username=username,
+ password=password,
+ token=token,
+ verify_ssl=verify_ssl,
+ )
+ # ECR and other non-Docker-Hub registries implement the OCI Distribution Spec,
+ # so they are handled by the generic OCI adapter.
+ return OciRegistryAdapter(
+ registry_url=registry_url,
+ username=username,
+ password=password,
+ token=token,
+ verify_ssl=verify_ssl,
+ )
diff --git a/prowler/providers/image/lib/registry/oci_adapter.py b/prowler/providers/image/lib/registry/oci_adapter.py
new file mode 100644
index 0000000000..b7eb4c780b
--- /dev/null
+++ b/prowler/providers/image/lib/registry/oci_adapter.py
@@ -0,0 +1,228 @@
+"""Generic OCI Distribution Spec registry adapter."""
+
+from __future__ import annotations
+
+import base64
+import ipaddress
+import re
+from typing import TYPE_CHECKING
+from urllib.parse import urlparse
+
+from prowler.lib.logger import logger
+from prowler.providers.image.exceptions.exceptions import (
+ ImageRegistryAuthError,
+ ImageRegistryCatalogError,
+ ImageRegistryNetworkError,
+)
+from prowler.providers.image.lib.registry.base import RegistryAdapter
+
+if TYPE_CHECKING:
+ import requests
+
+
+class OciRegistryAdapter(RegistryAdapter):
+ """Adapter for registries implementing OCI Distribution Spec."""
+
+ def __init__(
+ self,
+ registry_url: str,
+ username: str | None = None,
+ password: str | None = None,
+ token: str | None = None,
+ verify_ssl: bool = True,
+ ) -> None:
+ super().__init__(registry_url, username, password, token, verify_ssl)
+ self._base_url = self._normalise_url(registry_url)
+ self._bearer_token: str | None = None
+ self._basic_auth_verified = False
+
+ @staticmethod
+ def _normalise_url(url: str) -> str:
+ url = url.rstrip("/")
+ if not url.startswith(("http://", "https://")):
+ url = f"https://{url}"
+ return url
+
+ def list_repositories(self) -> list[str]:
+ self._ensure_auth()
+ repositories: list[str] = []
+ url = f"{self._base_url}/v2/_catalog"
+ params: dict = {"n": 200}
+ while url:
+ resp = self._authed_request("GET", url, params=params)
+ if resp.status_code == 404:
+ raise ImageRegistryCatalogError(
+ file=__file__,
+ message=f"Registry at {self.registry_url} does not support catalog listing (/_catalog returned 404). Use --image or --image-list instead.",
+ )
+ self._check_response(resp, "catalog listing")
+ data = resp.json()
+ repositories.extend(data.get("repositories", []))
+ url = self._next_page_url(resp)
+ params = {}
+ return repositories
+
+ def list_tags(self, repository: str) -> list[str]:
+ self._ensure_auth(repository=repository)
+ tags: list[str] = []
+ url = f"{self._base_url}/v2/{repository}/tags/list"
+ params: dict = {"n": 200}
+ while url:
+ resp = self._authed_request("GET", url, params=params)
+ self._check_response(resp, f"tag listing for {repository}")
+ data = resp.json()
+ tags.extend(data.get("tags", []) or [])
+ url = self._next_page_url(resp)
+ params = {}
+ return tags
+
+ def _ensure_auth(self, repository: str | None = None) -> None:
+ if self._bearer_token:
+ return
+ if self._basic_auth_verified:
+ return
+ if self.token:
+ self._bearer_token = self.token
+ return
+ ping_url = f"{self._base_url}/v2/"
+ resp = self._request_with_retry("GET", ping_url)
+ if resp.status_code == 200:
+ return
+ if resp.status_code == 401:
+ www_auth = resp.headers.get("Www-Authenticate", "")
+
+ if not www_auth.lower().startswith("bearer"):
+ # Basic auth challenge (e.g., AWS ECR)
+ if self.username and self.password:
+ self._basic_auth_verified = True
+ return
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=(
+ f"Registry {self.registry_url} requires authentication "
+ f"but no credentials provided. "
+ f"Set REGISTRY_USERNAME and REGISTRY_PASSWORD."
+ ),
+ )
+
+ # Bearer token exchange (standard OCI flow)
+ self._bearer_token = self._obtain_bearer_token(www_auth, repository)
+ return
+ if resp.status_code == 403:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Access denied to registry {self.registry_url} (HTTP 403). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Unexpected HTTP {resp.status_code} from registry {self.registry_url} during auth check.",
+ )
+
+ def _obtain_bearer_token(
+ self, www_authenticate: str, repository: str | None = None
+ ) -> str:
+ match = re.search(r'realm="([^"]+)"', www_authenticate)
+ if not match:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Cannot parse token endpoint from registry {self.registry_url}. Www-Authenticate: {www_authenticate[:200]}",
+ )
+ realm = match.group(1)
+ self._validate_realm_url(realm)
+ params: dict = {}
+ service_match = re.search(r'service="([^"]+)"', www_authenticate)
+ if service_match:
+ params["service"] = service_match.group(1)
+ scope_match = re.search(r'scope="([^"]+)"', www_authenticate)
+ if scope_match:
+ params["scope"] = scope_match.group(1)
+ elif repository:
+ params["scope"] = f"repository:{repository}:pull"
+ auth = None
+ if self.username and self.password:
+ auth = (self.username, self.password)
+ resp = self._request_with_retry("GET", realm, params=params, auth=auth)
+ if resp.status_code != 200:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Failed to obtain bearer token from {realm} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ data = resp.json()
+ token = data.get("token") or data.get("access_token", "")
+ if not token:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Token endpoint {realm} returned an empty token. Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ return token
+
+ @staticmethod
+ def _validate_realm_url(realm: str) -> None:
+ parsed = urlparse(realm)
+ if parsed.scheme not in ("http", "https"):
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Bearer token realm has disallowed scheme: {parsed.scheme}. Only http/https are allowed.",
+ )
+ if parsed.scheme == "http":
+ logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
+ hostname = parsed.hostname or ""
+ try:
+ addr = ipaddress.ip_address(hostname)
+ if addr.is_private or addr.is_loopback or addr.is_link_local:
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Bearer token realm points to a private/loopback address: {hostname}. This may indicate an SSRF attempt.",
+ )
+ except ValueError:
+ pass
+
+ def _resolve_basic_credentials(self) -> tuple[str | None, str | None]:
+ """Decode pre-encoded base64 auth tokens (e.g., from aws ecr get-authorization-token).
+
+ Returns (username, password) — decoded if the password is a base64 token
+ containing 'username:real_password', otherwise returned as-is.
+ """
+ if not self.password:
+ return self.username, self.password
+ try:
+ decoded = base64.b64decode(self.password).decode("utf-8")
+ if decoded.startswith(f"{self.username}:"):
+ return self.username, decoded[len(self.username) + 1 :]
+ except (ValueError, UnicodeDecodeError):
+ logger.debug("Password is not a base64-encoded auth token, using as-is")
+ return self.username, self.password
+
+ def _authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
+ resp = self._do_authed_request(method, url, **kwargs)
+ if resp.status_code == 401 and self._bearer_token:
+ logger.debug(
+ f"Bearer token rejected (HTTP 401), re-authenticating to {self.registry_url}"
+ )
+ self._bearer_token = None
+ self._ensure_auth()
+ resp = self._do_authed_request(method, url, **kwargs)
+ return resp
+
+ def _do_authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
+ headers = kwargs.pop("headers", {})
+ if self._bearer_token:
+ headers["Authorization"] = f"Bearer {self._bearer_token}"
+ elif self.username and self.password:
+ user, pwd = self._resolve_basic_credentials()
+ kwargs.setdefault("auth", (user, pwd))
+ kwargs["headers"] = headers
+ return self._request_with_retry(method, url, **kwargs)
+
+ def _check_response(self, resp: requests.Response, context: str) -> None:
+ if resp.status_code == 200:
+ return
+ if resp.status_code in (401, 403):
+ raise ImageRegistryAuthError(
+ file=__file__,
+ message=f"Authentication failed for {context} on {self.registry_url} (HTTP {resp.status_code}). Check REGISTRY_USERNAME and REGISTRY_PASSWORD.",
+ )
+ raise ImageRegistryNetworkError(
+ file=__file__,
+ message=f"Unexpected error during {context} on {self.registry_url} (HTTP {resp.status_code}): {resp.text[:200]}",
+ )
diff --git a/tests/providers/image/image_provider_test.py b/tests/providers/image/image_provider_test.py
index 75bebaefc5..32fd92f45a 100644
--- a/tests/providers/image/image_provider_test.py
+++ b/tests/providers/image/image_provider_test.py
@@ -1,3 +1,4 @@
+import os
import tempfile
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -41,6 +42,10 @@ def _make_provider(**kwargs):
class TestImageProvider:
+ @patch.dict(
+ os.environ,
+ {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""},
+ )
def test_image_provider(self):
"""Test default initialization."""
provider = _make_provider()
@@ -404,6 +409,329 @@ class TestImageProvider:
pass
+@patch.dict(
+ os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": "", "REGISTRY_TOKEN": ""}
+)
+class TestImageProviderRegistryAuth:
+ def test_no_auth_by_default(self):
+ """Test that no auth is set when no credentials are provided."""
+ provider = _make_provider()
+
+ assert not provider.registry_username
+ assert not provider.registry_password
+ assert not provider.registry_token
+ assert provider.auth_method == "No auth"
+
+ def test_basic_auth_with_explicit_params(self):
+ """Test basic auth via explicit constructor params."""
+ provider = _make_provider(
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+
+ assert provider.registry_username == "myuser"
+ assert provider.registry_password == "mypass"
+ assert provider.auth_method == "Basic auth"
+
+ def test_token_auth_with_explicit_param(self):
+ """Test token auth via explicit constructor param."""
+ provider = _make_provider(registry_token="my-token-123")
+
+ assert provider.registry_token == "my-token-123"
+ assert provider.auth_method == "Registry token"
+
+ def test_basic_auth_takes_precedence_over_token(self):
+ """Test that username/password takes precedence over token."""
+ provider = _make_provider(
+ registry_username="myuser",
+ registry_password="mypass",
+ registry_token="my-token",
+ )
+
+ assert provider.auth_method == "Basic auth"
+
+ @patch.dict(
+ os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
+ )
+ def test_basic_auth_from_env_vars(self):
+ """Test that env vars are used as fallback for basic auth."""
+ provider = _make_provider()
+
+ assert provider.registry_username == "envuser"
+ assert provider.registry_password == "envpass"
+ assert provider.auth_method == "Basic auth"
+
+ @patch.dict(os.environ, {"REGISTRY_TOKEN": "env-token"})
+ def test_token_auth_from_env_var(self):
+ """Test that env var is used as fallback for token auth."""
+ provider = _make_provider()
+
+ assert provider.registry_token == "env-token"
+ assert provider.auth_method == "Registry token"
+
+ @patch.dict(
+ os.environ, {"REGISTRY_USERNAME": "envuser", "REGISTRY_PASSWORD": "envpass"}
+ )
+ def test_explicit_params_override_env_vars(self):
+ """Test that explicit params take precedence over env vars."""
+ provider = _make_provider(
+ registry_username="explicit",
+ registry_password="explicit-pass",
+ )
+
+ assert provider.registry_username == "explicit"
+ assert provider.registry_password == "explicit-pass"
+
+ def test_build_trivy_env_no_auth(self):
+ """Test that _build_trivy_env returns base env when no auth."""
+ provider = _make_provider()
+ env = provider._build_trivy_env()
+
+ assert "TRIVY_USERNAME" not in env
+ assert "TRIVY_PASSWORD" not in env
+ assert "TRIVY_REGISTRY_TOKEN" not in env
+
+ def test_build_trivy_env_basic_auth_injects_trivy_vars(self):
+ """Test that _build_trivy_env injects TRIVY_USERNAME/PASSWORD for Trivy native auth."""
+ provider = _make_provider(
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+ env = provider._build_trivy_env()
+
+ assert env["TRIVY_USERNAME"] == "myuser"
+ assert env["TRIVY_PASSWORD"] == "mypass"
+
+ def test_build_trivy_env_token_auth(self):
+ """Test that _build_trivy_env injects registry token."""
+ provider = _make_provider(registry_token="my-token")
+ env = provider._build_trivy_env()
+
+ assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
+
+ @patch("subprocess.run")
+ def test_execute_trivy_injects_trivy_env_with_basic_auth(self, mock_subprocess):
+ """Test that _execute_trivy sets TRIVY_USERNAME/PASSWORD for Trivy native auth."""
+ provider = _make_provider(
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+
+ provider._execute_trivy(["trivy", "image", "alpine:3.18"], "alpine:3.18")
+
+ call_kwargs = mock_subprocess.call_args
+ env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
+ assert env["TRIVY_USERNAME"] == "myuser"
+ assert env["TRIVY_PASSWORD"] == "mypass"
+
+ @patch("subprocess.run")
+ def test_test_connection_with_basic_auth(self, mock_subprocess):
+ """Test test_connection passes TRIVY_USERNAME/PASSWORD via env for Trivy native auth."""
+ mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
+
+ result = ImageProvider.test_connection(
+ image="private.registry.io/myapp:v1",
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+
+ assert result.is_connected is True
+ # Should have 1 subprocess call: trivy only (no docker login/pull/logout)
+ assert mock_subprocess.call_count == 1
+ trivy_call = mock_subprocess.call_args
+ assert trivy_call.args[0][0] == "trivy"
+ env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
+ assert env["TRIVY_USERNAME"] == "myuser"
+ assert env["TRIVY_PASSWORD"] == "mypass"
+
+ @patch("subprocess.run")
+ def test_test_connection_with_token(self, mock_subprocess):
+ """Test test_connection passes token via env."""
+ mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
+
+ result = ImageProvider.test_connection(
+ image="private.registry.io/myapp:v1",
+ registry_token="my-token",
+ )
+
+ assert result.is_connected is True
+ call_kwargs = mock_subprocess.call_args
+ env = call_kwargs.kwargs.get("env") or call_kwargs[1].get("env")
+ assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
+
+ def test_print_credentials_shows_auth_method(self):
+ """Test that print_credentials outputs the auth method."""
+ provider = _make_provider(
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+ with mock.patch("builtins.print") as mock_print:
+ provider.print_credentials()
+ output = " ".join(
+ str(call.args[0]) for call in mock_print.call_args_list if call.args
+ )
+ assert "Basic auth" in output
+
+
+class TestExtractRegistry:
+ def test_docker_hub_simple(self):
+ assert ImageProvider._extract_registry("alpine:3.18") is None
+
+ def test_docker_hub_with_namespace(self):
+ assert ImageProvider._extract_registry("andoniaf/test-private:tag") is None
+
+ def test_ghcr(self):
+ assert ImageProvider._extract_registry("ghcr.io/user/image:tag") == "ghcr.io"
+
+ def test_ecr(self):
+ assert (
+ ImageProvider._extract_registry(
+ "123456789012.dkr.ecr.us-east-1.amazonaws.com/repo:tag"
+ )
+ == "123456789012.dkr.ecr.us-east-1.amazonaws.com"
+ )
+
+ def test_localhost_with_port(self):
+ assert (
+ ImageProvider._extract_registry("localhost:5000/myimage:latest")
+ == "localhost:5000"
+ )
+
+ def test_custom_registry_with_port(self):
+ assert (
+ ImageProvider._extract_registry("myregistry.io:5000/image:tag")
+ == "myregistry.io:5000"
+ )
+
+ def test_digest_reference(self):
+ assert (
+ ImageProvider._extract_registry("ghcr.io/user/image@sha256:abc123")
+ == "ghcr.io"
+ )
+
+ def test_bare_image_name(self):
+ assert ImageProvider._extract_registry("nginx") is None
+
+
+class TestTrivyAuthIntegration:
+ @patch("subprocess.run")
+ def test_run_scan_passes_trivy_env_with_credentials(self, mock_subprocess):
+ """Test that run_scan() passes TRIVY_USERNAME/PASSWORD via env when credentials are set."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider(
+ images=["ghcr.io/user/image:tag"],
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+
+ reports = []
+ for batch in provider.run_scan():
+ reports.extend(batch)
+
+ calls = mock_subprocess.call_args_list
+ # Only trivy calls, no docker login/pull
+ assert all(call.args[0][0] == "trivy" for call in calls)
+ env = calls[0].kwargs.get("env") or calls[0][1].get("env")
+ assert env["TRIVY_USERNAME"] == "myuser"
+ assert env["TRIVY_PASSWORD"] == "mypass"
+
+ @patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
+ @patch("subprocess.run")
+ def test_run_scan_no_trivy_auth_without_credentials(self, mock_subprocess):
+ """Test that run_scan() does NOT set TRIVY_USERNAME/PASSWORD when no credentials."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider()
+
+ for batch in provider.run_scan():
+ pass
+
+ calls = mock_subprocess.call_args_list
+ assert all(call.args[0][0] == "trivy" for call in calls)
+
+ @patch.dict(os.environ, {"REGISTRY_USERNAME": "", "REGISTRY_PASSWORD": ""})
+ @patch("subprocess.run")
+ def test_run_scan_token_auth_via_env(self, mock_subprocess):
+ """Test that run_scan() passes TRIVY_REGISTRY_TOKEN when only token is provided."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider(registry_token="my-token")
+
+ for batch in provider.run_scan():
+ pass
+
+ calls = mock_subprocess.call_args_list
+ assert all(call.args[0][0] == "trivy" for call in calls)
+ env = calls[0].kwargs.get("env") or calls[0][1].get("env")
+ assert env["TRIVY_REGISTRY_TOKEN"] == "my-token"
+
+ @patch("subprocess.run")
+ def test_run_with_credentials_only_calls_trivy(self, mock_subprocess):
+ """Test that run() only calls trivy (no docker login/pull/logout)."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider(
+ images=["ghcr.io/user/image:tag"],
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+
+ provider.run()
+
+ calls = mock_subprocess.call_args_list
+ assert all(call.args[0][0] == "trivy" for call in calls)
+
+ @patch("subprocess.run")
+ def test_run_scan_multiple_images_all_get_trivy_env(self, mock_subprocess):
+ """Test that all trivy calls get TRIVY_USERNAME/PASSWORD when scanning multiple images."""
+ mock_subprocess.return_value = MagicMock(
+ returncode=0, stdout=get_sample_trivy_json_output(), stderr=""
+ )
+ provider = _make_provider(
+ images=["ghcr.io/user/image1:tag", "ghcr.io/user/image2:tag"],
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+
+ for batch in provider.run_scan():
+ pass
+
+ calls = mock_subprocess.call_args_list
+ trivy_calls = [c for c in calls if c.args[0][0] == "trivy"]
+ assert len(trivy_calls) == 2
+ for call in trivy_calls:
+ env = call.kwargs.get("env") or call[1].get("env")
+ assert env["TRIVY_USERNAME"] == "myuser"
+ assert env["TRIVY_PASSWORD"] == "mypass"
+
+ @patch("subprocess.run")
+ def test_test_connection_docker_hub_uses_trivy_auth(self, mock_subprocess):
+ """Test test_connection passes TRIVY creds for Docker Hub images."""
+ mock_subprocess.return_value = MagicMock(returncode=0, stderr="")
+
+ result = ImageProvider.test_connection(
+ image="andoniaf/test-private:tag",
+ registry_username="myuser",
+ registry_password="mypass",
+ )
+
+ assert result.is_connected is True
+ assert mock_subprocess.call_count == 1
+ trivy_call = mock_subprocess.call_args
+ assert trivy_call.args[0][0] == "trivy"
+ env = trivy_call.kwargs.get("env") or trivy_call[1].get("env")
+ assert env["TRIVY_USERNAME"] == "myuser"
+ assert env["TRIVY_PASSWORD"] == "mypass"
+
+
class TestImageProviderInputValidation:
def test_invalid_timeout_format_raises_error(self):
"""Test that a non-matching timeout string raises ImageInvalidTimeoutError."""
diff --git a/tests/providers/image/lib/__init__.py b/tests/providers/image/lib/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/providers/image/lib/registry/__init__.py b/tests/providers/image/lib/registry/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/providers/image/lib/registry/test_arguments.py b/tests/providers/image/lib/registry/test_arguments.py
new file mode 100644
index 0000000000..5b52d385fd
--- /dev/null
+++ b/tests/providers/image/lib/registry/test_arguments.py
@@ -0,0 +1,223 @@
+from argparse import Namespace
+
+from prowler.providers.image.lib.arguments.arguments import validate_arguments
+
+
+class TestValidateArguments:
+ def test_no_source_fails(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry=None,
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "--image" in msg
+
+ def test_image_only_passes(self):
+ args = Namespace(
+ images=["nginx:latest"],
+ image_list_file=None,
+ registry=None,
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
+
+ def test_image_list_only_passes(self):
+ args = Namespace(
+ images=[],
+ image_list_file="images.txt",
+ registry=None,
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
+
+ def test_registry_only_passes(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry="myregistry.io",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
+
+ def test_image_filter_without_registry_fails(self):
+ args = Namespace(
+ images=["nginx:latest"],
+ image_list_file=None,
+ registry=None,
+ image_filter="^prod",
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "--image-filter requires --registry" in msg
+
+ def test_tag_filter_without_registry_fails(self):
+ args = Namespace(
+ images=["nginx:latest"],
+ image_list_file=None,
+ registry=None,
+ image_filter=None,
+ tag_filter="^v",
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "--tag-filter requires --registry" in msg
+
+ def test_max_images_without_registry_fails(self):
+ args = Namespace(
+ images=["nginx:latest"],
+ image_list_file=None,
+ registry=None,
+ image_filter=None,
+ tag_filter=None,
+ max_images=50,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "--max-images requires --registry" in msg
+
+ def test_registry_insecure_without_registry_fails(self):
+ args = Namespace(
+ images=[],
+ image_list_file="i.txt",
+ registry=None,
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=True,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "--registry-insecure requires --registry" in msg
+
+ def test_docker_hub_no_namespace_fails(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry="docker.io",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "namespace" in msg.lower()
+
+ def test_docker_hub_with_namespace_passes(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry="docker.io/myorg",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
+
+ def test_docker_hub_https_no_namespace_fails(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry="https://docker.io",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "namespace" in msg.lower()
+
+ def test_registry_with_filters_passes(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry="myregistry.io",
+ image_filter="^prod",
+ tag_filter="^v",
+ max_images=100,
+ registry_insecure=True,
+ registry_list_images=False,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
+
+ def test_registry_list_without_registry_fails(self):
+ args = Namespace(
+ images=["nginx:latest"],
+ image_list_file=None,
+ registry=None,
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=True,
+ )
+ ok, msg = validate_arguments(args)
+ assert not ok
+ assert "--registry-list requires --registry" in msg
+
+ def test_registry_list_with_registry_passes(self):
+ args = Namespace(
+ images=[],
+ image_list_file=None,
+ registry="myregistry.io",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=True,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
+
+ def test_combined_registry_and_image_passes(self):
+ args = Namespace(
+ images=["nginx:latest"],
+ image_list_file=None,
+ registry="myregistry.io",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ )
+ ok, _ = validate_arguments(args)
+ assert ok
diff --git a/tests/providers/image/lib/registry/test_dockerhub_adapter.py b/tests/providers/image/lib/registry/test_dockerhub_adapter.py
new file mode 100644
index 0000000000..4f2d91d57c
--- /dev/null
+++ b/tests/providers/image/lib/registry/test_dockerhub_adapter.py
@@ -0,0 +1,163 @@
+from unittest.mock import MagicMock, patch
+
+import pytest
+import requests
+
+from prowler.providers.image.exceptions.exceptions import (
+ ImageRegistryAuthError,
+ ImageRegistryCatalogError,
+ ImageRegistryNetworkError,
+)
+from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
+
+
+class TestDockerHubAdapterInit:
+ def test_extract_namespace_simple(self):
+ assert DockerHubAdapter._extract_namespace("docker.io/myorg") == "myorg"
+
+ def test_extract_namespace_https(self):
+ assert DockerHubAdapter._extract_namespace("https://docker.io/myorg") == "myorg"
+
+ def test_extract_namespace_registry1(self):
+ assert (
+ DockerHubAdapter._extract_namespace("registry-1.docker.io/myorg") == "myorg"
+ )
+
+ def test_extract_namespace_empty(self):
+ assert DockerHubAdapter._extract_namespace("docker.io") == ""
+
+ def test_extract_namespace_with_slash(self):
+ assert DockerHubAdapter._extract_namespace("docker.io/myorg/") == "myorg"
+
+
+class TestDockerHubListRepositories:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_repos(self, mock_request):
+ # Hub login (now goes through requests.request via _request_with_retry)
+ login_resp = MagicMock(status_code=200)
+ login_resp.json.return_value = {"token": "jwt"}
+ # Repo listing
+ repos_resp = MagicMock(status_code=200)
+ repos_resp.json.return_value = {
+ "results": [{"name": "app1"}, {"name": "app2"}],
+ "next": None,
+ }
+ mock_request.side_effect = [login_resp, repos_resp]
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ repos = adapter.list_repositories()
+ assert repos == ["myorg/app1", "myorg/app2"]
+
+ def test_list_repos_no_namespace_raises(self):
+ adapter = DockerHubAdapter("docker.io")
+ with pytest.raises(ImageRegistryCatalogError, match="namespace"):
+ adapter.list_repositories()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_repos_public_no_credentials(self, mock_request):
+ """When no credentials are provided, use the public /v2/repositories/{ns}/ endpoint."""
+ repos_resp = MagicMock(status_code=200)
+ repos_resp.json.return_value = {
+ "results": [{"name": "repo1"}, {"name": "repo2"}],
+ "next": None,
+ }
+ mock_request.return_value = repos_resp
+ adapter = DockerHubAdapter("docker.io/publicns")
+ repos = adapter.list_repositories()
+ assert repos == ["publicns/repo1", "publicns/repo2"]
+ called_url = mock_request.call_args[0][1]
+ assert "/v2/repositories/publicns/" in called_url
+ assert "/v2/namespaces/" not in called_url
+
+
+class TestDockerHubListTags:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_tags(self, mock_request):
+ # Token exchange (now goes through requests.request via _request_with_retry)
+ token_resp = MagicMock(status_code=200)
+ token_resp.json.return_value = {"token": "registry-token"}
+ # Tag listing
+ tags_resp = MagicMock(status_code=200, headers={})
+ tags_resp.json.return_value = {"tags": ["latest", "v1.0"]}
+ mock_request.side_effect = [token_resp, tags_resp]
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ tags = adapter.list_tags("myorg/myapp")
+ assert tags == ["latest", "v1.0"]
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_tags_auth_failure(self, mock_request):
+ # Token exchange
+ token_resp = MagicMock(status_code=200)
+ token_resp.json.return_value = {"token": "tok"}
+ # Tag listing returns 401
+ tags_resp = MagicMock(status_code=401)
+ mock_request.side_effect = [token_resp, tags_resp]
+ adapter = DockerHubAdapter("docker.io/myorg")
+ with pytest.raises(ImageRegistryAuthError):
+ adapter.list_tags("myorg/myapp")
+
+
+class TestDockerHubLogin:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_login_failure(self, mock_request):
+ resp = MagicMock(status_code=401)
+ mock_request.return_value = resp
+ adapter = DockerHubAdapter("docker.io/myorg", username="bad", password="creds")
+ with pytest.raises(ImageRegistryAuthError, match="login failed"):
+ adapter._hub_login()
+
+ def test_login_skipped_without_credentials(self):
+ adapter = DockerHubAdapter("docker.io/myorg")
+ adapter._hub_login() # Should not raise
+ assert adapter._hub_jwt is None
+
+
+class TestDockerHubRetry:
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_retry_on_429(self, mock_request, mock_sleep):
+ resp_429 = MagicMock(status_code=429)
+ resp_200 = MagicMock(status_code=200)
+ mock_request.side_effect = [resp_429, resp_200]
+ adapter = DockerHubAdapter("docker.io/myorg")
+ result = adapter._request_with_retry(
+ "GET", "https://hub.docker.com/v2/namespaces/myorg/repositories"
+ )
+ assert result.status_code == 200
+
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_connection_error_retries(self, mock_request, mock_sleep):
+ mock_request.side_effect = requests.exceptions.ConnectionError("fail")
+ adapter = DockerHubAdapter("docker.io/myorg")
+ with pytest.raises(ImageRegistryNetworkError):
+ adapter._request_with_retry("GET", "https://hub.docker.com")
+ assert mock_request.call_count == 3
+
+
+class TestDockerHubEmptyTokens:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_empty_hub_jwt_raises(self, mock_request):
+ resp = MagicMock(status_code=200)
+ resp.json.return_value = {"token": ""}
+ mock_request.return_value = resp
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ with pytest.raises(ImageRegistryAuthError, match="empty JWT"):
+ adapter._hub_login()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_none_hub_jwt_raises(self, mock_request):
+ resp = MagicMock(status_code=200)
+ resp.json.return_value = {}
+ mock_request.return_value = resp
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ with pytest.raises(ImageRegistryAuthError, match="empty JWT"):
+ adapter._hub_login()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_empty_registry_token_raises(self, mock_request):
+ resp = MagicMock(status_code=200)
+ resp.json.return_value = {"token": ""}
+ mock_request.return_value = resp
+ adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
+ with pytest.raises(ImageRegistryAuthError, match="empty token"):
+ adapter._get_registry_token("myorg/myapp")
diff --git a/tests/providers/image/lib/registry/test_factory.py b/tests/providers/image/lib/registry/test_factory.py
new file mode 100644
index 0000000000..74aab82d02
--- /dev/null
+++ b/tests/providers/image/lib/registry/test_factory.py
@@ -0,0 +1,34 @@
+from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
+from prowler.providers.image.lib.registry.factory import create_registry_adapter
+from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
+
+
+class TestCreateRegistryAdapter:
+ def test_docker_hub_returns_dockerhub_adapter(self):
+ adapter = create_registry_adapter("docker.io/myorg")
+ assert isinstance(adapter, DockerHubAdapter)
+
+ def test_oci_returns_oci_adapter(self):
+ adapter = create_registry_adapter("myregistry.io")
+ assert isinstance(adapter, OciRegistryAdapter)
+
+ def test_ecr_returns_oci_adapter(self):
+ adapter = create_registry_adapter("123456789.dkr.ecr.us-east-1.amazonaws.com")
+ assert isinstance(adapter, OciRegistryAdapter)
+
+ def test_passes_credentials(self):
+ adapter = create_registry_adapter(
+ "myregistry.io",
+ username="user",
+ password="pass",
+ token="tok",
+ verify_ssl=False,
+ )
+ assert adapter.username == "user"
+ assert adapter.password == "pass"
+ assert adapter.token == "tok"
+ assert adapter.verify_ssl is False
+
+ def test_registry_1_docker_io(self):
+ adapter = create_registry_adapter("registry-1.docker.io/myorg")
+ assert isinstance(adapter, DockerHubAdapter)
diff --git a/tests/providers/image/lib/registry/test_oci_adapter.py b/tests/providers/image/lib/registry/test_oci_adapter.py
new file mode 100644
index 0000000000..5deda42fe2
--- /dev/null
+++ b/tests/providers/image/lib/registry/test_oci_adapter.py
@@ -0,0 +1,426 @@
+import base64
+from unittest.mock import MagicMock, patch
+
+import pytest
+import requests
+
+from prowler.providers.image.exceptions.exceptions import (
+ ImageRegistryAuthError,
+ ImageRegistryCatalogError,
+ ImageRegistryNetworkError,
+)
+from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
+
+
+class TestOciAdapterInit:
+ def test_normalise_url_adds_https(self):
+ adapter = OciRegistryAdapter("myregistry.io")
+ assert adapter._base_url == "https://myregistry.io"
+
+ def test_normalise_url_keeps_http(self):
+ adapter = OciRegistryAdapter("http://myregistry.io")
+ assert adapter._base_url == "http://myregistry.io"
+
+ def test_normalise_url_strips_trailing_slash(self):
+ adapter = OciRegistryAdapter("https://myregistry.io/")
+ assert adapter._base_url == "https://myregistry.io"
+
+ def test_stores_credentials(self):
+ adapter = OciRegistryAdapter(
+ "reg.io", username="u", password="p", token="t", verify_ssl=False
+ )
+ assert adapter.username == "u"
+ assert adapter.password == "p"
+ assert adapter.token == "t"
+ assert adapter.verify_ssl is False
+
+
+class TestOciAdapterAuth:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_ensure_auth_with_token(self, mock_request):
+ adapter = OciRegistryAdapter("reg.io", token="my-token")
+ adapter._ensure_auth()
+ assert adapter._bearer_token == "my-token"
+ mock_request.assert_not_called()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_ensure_auth_anonymous_ok(self, mock_request):
+ resp = MagicMock(status_code=200)
+ mock_request.return_value = resp
+ adapter = OciRegistryAdapter("reg.io")
+ adapter._ensure_auth()
+ assert adapter._bearer_token is None
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_ensure_auth_bearer_challenge(self, mock_request):
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={
+ "Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
+ },
+ )
+ token_resp = MagicMock(status_code=200)
+ token_resp.json.return_value = {"token": "bearer-tok"}
+ mock_request.side_effect = [ping_resp, token_resp]
+ adapter = OciRegistryAdapter("reg.io", username="u", password="p")
+ adapter._ensure_auth()
+ assert adapter._bearer_token == "bearer-tok"
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_ensure_auth_403_raises(self, mock_request):
+ resp = MagicMock(status_code=403)
+ mock_request.return_value = resp
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryAuthError):
+ adapter._ensure_auth()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_ensure_auth_basic_challenge_with_creds(self, mock_request):
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
+ )
+ mock_request.return_value = ping_resp
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="tok")
+ adapter._ensure_auth()
+ assert adapter._basic_auth_verified is True
+ assert adapter._bearer_token is None
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_ensure_auth_basic_challenge_no_creds(self, mock_request):
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
+ )
+ mock_request.return_value = ping_resp
+ adapter = OciRegistryAdapter("ecr.aws")
+ with pytest.raises(ImageRegistryAuthError):
+ adapter._ensure_auth()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_basic_auth_used_in_requests(self, mock_request):
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
+ )
+ catalog_resp = MagicMock(status_code=200, headers={})
+ catalog_resp.json.return_value = {"repositories": ["myapp"]}
+ mock_request.side_effect = [ping_resp, catalog_resp]
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="tok")
+ adapter._ensure_auth()
+ adapter._authed_request("GET", "https://ecr.aws/v2/_catalog")
+ # The catalog request should use Basic auth (auth kwarg), not Bearer header
+ call_kwargs = mock_request.call_args_list[1][1]
+ assert call_kwargs.get("auth") == ("AWS", "tok")
+ assert "Authorization" not in call_kwargs.get("headers", {})
+
+ def test_resolve_basic_credentials_decodes_base64_token(self):
+ raw_password = "real-jwt-password"
+ encoded = base64.b64encode(f"AWS:{raw_password}".encode()).decode()
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=encoded)
+ user, pwd = adapter._resolve_basic_credentials()
+ assert user == "AWS"
+ assert pwd == raw_password
+
+ def test_resolve_basic_credentials_passthrough_raw_password(self):
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password="plain-pass")
+ user, pwd = adapter._resolve_basic_credentials()
+ assert user == "AWS"
+ assert pwd == "plain-pass"
+
+ def test_resolve_basic_credentials_passthrough_invalid_base64(self):
+ adapter = OciRegistryAdapter(
+ "ecr.aws", username="AWS", password="not!valid~base64"
+ )
+ user, pwd = adapter._resolve_basic_credentials()
+ assert user == "AWS"
+ assert pwd == "not!valid~base64"
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_basic_auth_decodes_ecr_token_in_request(self, mock_request):
+ raw_password = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.abc"
+ encoded = base64.b64encode(f"AWS:{raw_password}".encode()).decode()
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={"Www-Authenticate": 'Basic realm="https://ecr.aws"'},
+ )
+ catalog_resp = MagicMock(status_code=200, headers={})
+ catalog_resp.json.return_value = {"repositories": ["myapp"]}
+ mock_request.side_effect = [ping_resp, catalog_resp]
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=encoded)
+ adapter._ensure_auth()
+ adapter._authed_request("GET", "https://ecr.aws/v2/_catalog")
+ call_kwargs = mock_request.call_args_list[1][1]
+ assert call_kwargs.get("auth") == ("AWS", raw_password)
+
+ def test_resolve_basic_credentials_none_password(self):
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=None)
+ user, pwd = adapter._resolve_basic_credentials()
+ assert user == "AWS"
+ assert pwd is None
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_authed_request_retries_on_401_with_bearer(self, mock_request):
+ adapter = OciRegistryAdapter("reg.io", username="u", password="p")
+ adapter._bearer_token = "expired-token"
+ # First request: 401 (expired token)
+ resp_401 = MagicMock(status_code=401)
+ # _ensure_auth ping: 401 with bearer challenge
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={
+ "Www-Authenticate": 'Bearer realm="https://auth.reg.io/token",service="registry"'
+ },
+ )
+ # Token exchange: success
+ token_resp = MagicMock(status_code=200)
+ token_resp.json.return_value = {"token": "new-token"}
+ # Second request: 200 (new token works)
+ resp_200 = MagicMock(status_code=200)
+ mock_request.side_effect = [resp_401, ping_resp, token_resp, resp_200]
+ result = adapter._authed_request("GET", "https://reg.io/v2/myapp/tags/list")
+ assert result.status_code == 200
+ assert adapter._bearer_token == "new-token"
+ assert mock_request.call_count == 4
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_authed_request_no_retry_on_401_without_bearer(self, mock_request):
+ adapter = OciRegistryAdapter("reg.io", username="u", password="p")
+ adapter._basic_auth_verified = True
+ # No bearer token — using basic auth
+ resp_401 = MagicMock(status_code=401)
+ mock_request.return_value = resp_401
+ result = adapter._authed_request("GET", "https://reg.io/v2/_catalog")
+ assert result.status_code == 401
+ # Should only be called once (no retry for basic auth)
+ assert mock_request.call_count == 1
+
+
+class TestOciAdapterListRepositories:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_repos_single_page(self, mock_request):
+ ping_resp = MagicMock(status_code=200)
+ catalog_resp = MagicMock(status_code=200, headers={})
+ catalog_resp.json.return_value = {
+ "repositories": ["app/frontend", "app/backend"]
+ }
+ mock_request.side_effect = [ping_resp, catalog_resp]
+ adapter = OciRegistryAdapter("reg.io")
+ repos = adapter.list_repositories()
+ assert repos == ["app/frontend", "app/backend"]
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_repos_paginated(self, mock_request):
+ ping_resp = MagicMock(status_code=200)
+ page1_resp = MagicMock(
+ status_code=200,
+ headers={"Link": '; rel="next"'},
+ )
+ page1_resp.json.return_value = {"repositories": ["a"]}
+ page2_resp = MagicMock(status_code=200, headers={})
+ page2_resp.json.return_value = {"repositories": ["b"]}
+ mock_request.side_effect = [ping_resp, page1_resp, page2_resp]
+ adapter = OciRegistryAdapter("reg.io")
+ repos = adapter.list_repositories()
+ assert repos == ["a", "b"]
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_repos_404_raises(self, mock_request):
+ ping_resp = MagicMock(status_code=200)
+ catalog_resp = MagicMock(status_code=404)
+ mock_request.side_effect = [ping_resp, catalog_resp]
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryCatalogError):
+ adapter.list_repositories()
+
+
+class TestOciAdapterListTags:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_tags(self, mock_request):
+ ping_resp = MagicMock(status_code=200)
+ tags_resp = MagicMock(status_code=200, headers={})
+ tags_resp.json.return_value = {"tags": ["latest", "v1.0"]}
+ mock_request.side_effect = [ping_resp, tags_resp]
+ adapter = OciRegistryAdapter("reg.io")
+ tags = adapter.list_tags("myapp")
+ assert tags == ["latest", "v1.0"]
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_list_tags_null_tags(self, mock_request):
+ ping_resp = MagicMock(status_code=200)
+ tags_resp = MagicMock(status_code=200, headers={})
+ tags_resp.json.return_value = {"tags": None}
+ mock_request.side_effect = [ping_resp, tags_resp]
+ adapter = OciRegistryAdapter("reg.io")
+ tags = adapter.list_tags("myapp")
+ assert tags == []
+
+
+class TestOciAdapterRetry:
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_retry_on_429(self, mock_request, mock_sleep):
+ resp_429 = MagicMock(status_code=429)
+ resp_200 = MagicMock(status_code=200)
+ mock_request.side_effect = [resp_429, resp_200]
+ adapter = OciRegistryAdapter("reg.io")
+ result = adapter._request_with_retry("GET", "https://reg.io/v2/")
+ assert result.status_code == 200
+ mock_sleep.assert_called_once()
+
+ @patch("prowler.providers.image.lib.registry.base.time.sleep")
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_connection_error_retries(self, mock_request, mock_sleep):
+ mock_request.side_effect = requests.exceptions.ConnectionError("failed")
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryNetworkError):
+ adapter._request_with_retry("GET", "https://reg.io/v2/")
+ assert mock_request.call_count == 3
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_timeout_raises_immediately(self, mock_request):
+ mock_request.side_effect = requests.exceptions.Timeout("timeout")
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryNetworkError):
+ adapter._request_with_retry("GET", "https://reg.io/v2/")
+ assert mock_request.call_count == 1
+
+
+class TestOciAdapterNextPageUrl:
+ def test_no_link_header(self):
+ adapter = OciRegistryAdapter("reg.io")
+ resp = MagicMock(headers={})
+ assert adapter._next_page_url(resp) is None
+
+ def test_link_header_with_next(self):
+ adapter = OciRegistryAdapter("reg.io")
+ resp = MagicMock(
+ headers={"Link": '; rel="next"'}
+ )
+ assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?n=200&last=b"
+
+ def test_link_header_no_next(self):
+ adapter = OciRegistryAdapter("reg.io")
+ resp = MagicMock(
+ headers={"Link": '; rel="prev"'}
+ )
+ assert adapter._next_page_url(resp) is None
+
+ def test_link_header_relative_url(self):
+ adapter = OciRegistryAdapter("reg.io")
+ resp = MagicMock(
+ url="https://reg.io/v2/_catalog?n=200",
+ headers={"Link": '; rel="next"'},
+ )
+ assert adapter._next_page_url(resp) == "https://reg.io/v2/_catalog?last=b&n=200"
+
+
+class TestOciAdapterSSRF:
+ def test_reject_file_scheme(self):
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
+ adapter._validate_realm_url("file:///etc/passwd")
+
+ def test_reject_ftp_scheme(self):
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
+ adapter._validate_realm_url("ftp://evil.com/token")
+
+ def test_reject_private_ip(self):
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
+ adapter._validate_realm_url("https://10.0.0.1/token")
+
+ def test_reject_loopback(self):
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
+ adapter._validate_realm_url("https://127.0.0.1/token")
+
+ def test_reject_link_local(self):
+ adapter = OciRegistryAdapter("reg.io")
+ with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
+ adapter._validate_realm_url("https://169.254.169.254/latest/meta-data")
+
+ def test_accept_public_https(self):
+ adapter = OciRegistryAdapter("reg.io")
+ # Should not raise
+ adapter._validate_realm_url("https://auth.example.com/token")
+
+ def test_accept_hostname_not_ip(self):
+ adapter = OciRegistryAdapter("reg.io")
+ # Hostnames (not IPs) should pass even if they resolve to private IPs
+ adapter._validate_realm_url("https://internal.corp.com/token")
+
+
+class TestOciAdapterEmptyToken:
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_empty_bearer_token_raises(self, mock_request):
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={
+ "Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
+ },
+ )
+ token_resp = MagicMock(status_code=200)
+ token_resp.json.return_value = {"token": "", "access_token": ""}
+ mock_request.side_effect = [ping_resp, token_resp]
+ adapter = OciRegistryAdapter("reg.io", username="u", password="p")
+ with pytest.raises(ImageRegistryAuthError, match="empty token"):
+ adapter._ensure_auth()
+
+ @patch("prowler.providers.image.lib.registry.base.requests.request")
+ def test_none_bearer_token_raises(self, mock_request):
+ ping_resp = MagicMock(
+ status_code=401,
+ headers={
+ "Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
+ },
+ )
+ token_resp = MagicMock(status_code=200)
+ token_resp.json.return_value = {}
+ mock_request.side_effect = [ping_resp, token_resp]
+ adapter = OciRegistryAdapter("reg.io", username="u", password="p")
+ with pytest.raises(ImageRegistryAuthError, match="empty token"):
+ adapter._ensure_auth()
+
+
+class TestOciAdapterNarrowExcept:
+ def test_invalid_utf8_base64_falls_through(self):
+ # Create a base64 string that decodes to invalid UTF-8
+ invalid_bytes = base64.b64encode(b"\xff\xfe").decode()
+ adapter = OciRegistryAdapter("ecr.aws", username="AWS", password=invalid_bytes)
+ user, pwd = adapter._resolve_basic_credentials()
+ assert user == "AWS"
+ assert pwd == invalid_bytes
+
+
+class TestCredentialRedaction:
+ def test_getstate_redacts_credentials(self):
+ adapter = OciRegistryAdapter(
+ "reg.io", username="u", password="secret", token="tok"
+ )
+ state = adapter.__getstate__()
+ assert state["_password"] == "***"
+ assert state["_token"] == "***"
+ assert state["username"] == "u"
+ assert state["registry_url"] == "reg.io"
+
+ def test_getstate_none_credentials(self):
+ adapter = OciRegistryAdapter("reg.io")
+ state = adapter.__getstate__()
+ assert state["_password"] is None
+ assert state["_token"] is None
+
+ def test_repr_redacts_credentials(self):
+ adapter = OciRegistryAdapter(
+ "reg.io", username="u", password="s3cret_pw", token="s3cret_tk"
+ )
+ r = repr(adapter)
+ assert "s3cret_pw" not in r
+ assert "s3cret_tk" not in r
+ assert "" in r
+
+ def test_properties_still_work(self):
+ adapter = OciRegistryAdapter("reg.io", password="secret", token="tok")
+ assert adapter.password == "secret"
+ assert adapter.token == "tok"
diff --git a/tests/providers/image/lib/registry/test_provider_registry.py b/tests/providers/image/lib/registry/test_provider_registry.py
new file mode 100644
index 0000000000..86667f0412
--- /dev/null
+++ b/tests/providers/image/lib/registry/test_provider_registry.py
@@ -0,0 +1,238 @@
+import os
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from prowler.providers.image.exceptions.exceptions import (
+ ImageInvalidFilterError,
+ ImageMaxImagesExceededError,
+)
+from prowler.providers.image.image_provider import ImageProvider
+from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
+
+_CLEAN_ENV = {
+ "PATH": os.environ.get("PATH", ""),
+ "HOME": os.environ.get("HOME", ""),
+}
+
+
+def _build_provider(**overrides):
+ defaults = dict(
+ images=[],
+ registry="myregistry.io",
+ image_filter=None,
+ tag_filter=None,
+ max_images=0,
+ registry_insecure=False,
+ registry_list_images=False,
+ config_content={"image": {}},
+ )
+ defaults.update(overrides)
+ with patch.dict(os.environ, _CLEAN_ENV, clear=True):
+ return ImageProvider(**defaults)
+
+
+class TestRegistryEnumeration:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_enumerate_oci_registry(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
+ adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider()
+ assert "myregistry.io/app/frontend:latest" in provider.images
+ assert "myregistry.io/app/frontend:v1.0" in provider.images
+ assert "myregistry.io/app/backend:latest" in provider.images
+ assert len(provider.images) == 3
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_image_filter(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["prod/app", "dev/app", "staging/app"]
+ adapter.list_tags.return_value = ["latest"]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(image_filter="^prod/")
+ assert len(provider.images) == 1
+ assert "myregistry.io/prod/app:latest" in provider.images
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_tag_filter(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["myapp"]
+ adapter.list_tags.return_value = ["latest", "v1.0", "v2.0", "dev-abc123"]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(tag_filter=r"^v\d+\.\d+$")
+ assert len(provider.images) == 2
+ assert "myregistry.io/myapp:v1.0" in provider.images
+ assert "myregistry.io/myapp:v2.0" in provider.images
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_combined_filters(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["prod/app", "dev/app"]
+ adapter.list_tags.return_value = ["latest", "v1.0"]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(image_filter="^prod/", tag_filter="^v")
+ assert len(provider.images) == 1
+ assert "myregistry.io/prod/app:v1.0" in provider.images
+
+
+class TestMaxImages:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_max_images_exceeded(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["app1", "app2", "app3"]
+ adapter.list_tags.return_value = ["latest", "v1.0"]
+ mock_factory.return_value = adapter
+
+ with pytest.raises(ImageMaxImagesExceededError):
+ _build_provider(max_images=2)
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_max_images_not_exceeded(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["app1"]
+ adapter.list_tags.return_value = ["latest"]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(max_images=10)
+ assert len(provider.images) == 1
+
+
+class TestDeduplication:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_deduplication_with_explicit_images(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["myapp"]
+ adapter.list_tags.return_value = ["latest"]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(images=["myregistry.io/myapp:latest"])
+ assert provider.images.count("myregistry.io/myapp:latest") == 1
+
+
+class TestInvalidFilters:
+ def test_invalid_image_filter_regex(self):
+ with pytest.raises(ImageInvalidFilterError):
+ _build_provider(image_filter="[invalid")
+
+ def test_invalid_tag_filter_regex(self):
+ with pytest.raises(ImageInvalidFilterError):
+ _build_provider(tag_filter="(unclosed")
+
+
+class TestRegistryInsecure:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_insecure_passes_verify_false(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["app"]
+ adapter.list_tags.return_value = ["latest"]
+ mock_factory.return_value = adapter
+
+ _build_provider(registry_insecure=True)
+ mock_factory.assert_called_once()
+ call_kwargs = mock_factory.call_args[1]
+ assert call_kwargs["verify_ssl"] is False
+
+
+class TestEmptyRegistry:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_empty_catalog_with_explicit_images(self, mock_factory):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = []
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(images=["nginx:latest"])
+ assert provider.images == ["nginx:latest"]
+
+
+class TestRegistryList:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_registry_list_prints_and_exits(self, mock_factory, capsys):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["app/frontend", "app/backend"]
+ adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
+ mock_factory.return_value = adapter
+
+ with pytest.raises(SystemExit) as exc_info:
+ _build_provider(registry_list_images=True)
+
+ assert exc_info.value.code == 0
+ captured = capsys.readouterr()
+ assert "app/frontend" in captured.out
+ assert "app/backend" in captured.out
+ assert "latest" in captured.out
+ assert "v1.0" in captured.out
+ assert "2 repositories" in captured.out
+ assert "3 images" in captured.out
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_registry_list_respects_image_filter(self, mock_factory, capsys):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["prod/app", "dev/app"]
+ adapter.list_tags.return_value = ["latest"]
+ mock_factory.return_value = adapter
+
+ with pytest.raises(SystemExit) as exc_info:
+ _build_provider(registry_list_images=True, image_filter="^prod/")
+
+ assert exc_info.value.code == 0
+ captured = capsys.readouterr()
+ assert "prod/app" in captured.out
+ assert "dev/app" not in captured.out
+ assert "1 repository" in captured.out
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_registry_list_respects_tag_filter(self, mock_factory, capsys):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["myapp"]
+ adapter.list_tags.return_value = ["latest", "v1.0", "dev-abc"]
+ mock_factory.return_value = adapter
+
+ with pytest.raises(SystemExit) as exc_info:
+ _build_provider(registry_list_images=True, tag_filter=r"^v\d+\.\d+$")
+
+ assert exc_info.value.code == 0
+ captured = capsys.readouterr()
+ assert "v1.0" in captured.out
+ assert "dev-abc" not in captured.out
+ assert "1 image)" in captured.out
+
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_registry_list_skips_max_images(self, mock_factory, capsys):
+ adapter = MagicMock()
+ adapter.list_repositories.return_value = ["app1", "app2", "app3"]
+ adapter.list_tags.return_value = ["latest", "v1.0"]
+ mock_factory.return_value = adapter
+
+ # max_images=1 would normally raise, but --registry-list skips it
+ with pytest.raises(SystemExit) as exc_info:
+ _build_provider(registry_list_images=True, max_images=1)
+
+ assert exc_info.value.code == 0
+ captured = capsys.readouterr()
+ assert "6 images" in captured.out
+
+
+class TestDockerHubEnumeration:
+ @patch("prowler.providers.image.image_provider.create_registry_adapter")
+ def test_dockerhub_images_use_repo_tag_format(self, mock_factory):
+ """Docker Hub images should use repo:tag format without host prefix."""
+ adapter = MagicMock(spec=DockerHubAdapter)
+ adapter.list_repositories.return_value = ["myorg/app1", "myorg/app2"]
+ adapter.list_tags.side_effect = [["latest", "v1.0"], ["latest"]]
+ mock_factory.return_value = adapter
+
+ provider = _build_provider(registry="docker.io/myorg")
+ # Docker Hub images should NOT have host prefix
+ assert "myorg/app1:latest" in provider.images
+ assert "myorg/app1:v1.0" in provider.images
+ assert "myorg/app2:latest" in provider.images
+ # Ensure no host prefix was added
+ for img in provider.images:
+ assert not img.startswith("docker.io/"), f"Unexpected host prefix in {img}"
+ assert len(provider.images) == 3