fix(image): block parser-mismatch SSRF in registry auth (#10945)

This commit is contained in:
Andoni Alonso
2026-04-30 12:56:35 +02:00
committed by GitHub
parent 5987651aee
commit 4608e45c8a
6 changed files with 540 additions and 87 deletions
+4
View File
@@ -18,6 +18,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
### 🔐 Security
- Parser-mismatch SSRF in image provider registry auth where crafted bearer-token realms and pagination links could force requests to internal addresses and leak credentials cross-origin [(#10945)](https://github.com/prowler-cloud/prowler/pull/10945)
---
## [5.25.1] (Prowler v5.25.1)
+140 -10
View File
@@ -2,21 +2,51 @@
from __future__ import annotations
import ipaddress
import re
import socket
import time
from abc import ABC, abstractmethod
from urllib.parse import urlparse
import requests
import tldextract
from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryNetworkError,
)
_MAX_RETRIES = 3
_BACKOFF_BASE = 1
_USER_AGENT = f"Prowler/{prowler_version} (registry-adapter)"
_NON_PUBLIC_IP_PROPERTIES = (
"is_private",
"is_loopback",
"is_link_local",
"is_multicast",
"is_reserved",
"is_unspecified",
)
def _ip_is_non_public(ip_str: str) -> bool:
try:
addr = ipaddress.ip_address(ip_str)
except ValueError:
return False
return any(getattr(addr, prop) for prop in _NON_PUBLIC_IP_PROPERTIES)
def _registrable_domain(host: str) -> str | None:
ext = tldextract.extract(host)
if not ext.domain or not ext.suffix:
return None
return f"{ext.domain}.{ext.suffix}"
class RegistryAdapter(ABC):
"""Abstract base class for registry adapters."""
@@ -68,6 +98,107 @@ class RegistryAdapter(ABC):
"""Enumerate all tags for a repository."""
...
def _origin_url(self) -> str:
"""The URL whose host the validator compares against when enforce_origin=True.
Subclasses can override if the effective registry origin differs from
``registry_url`` (e.g., Docker Hub talks to ``registry-1.docker.io``).
"""
return self.registry_url
def _validate_outbound_url(
self,
url: str,
*,
enforce_origin: bool = True,
origin_url: str | None = None,
) -> str:
"""Validate a URL before it is passed to ``requests``.
Defenses against parser-mismatch SSRF (PRWLRHELP-2103):
- canonicalise via ``requests.PreparedRequest`` so validator and connector
parse the same string the same way;
- reject schemes other than http/https;
- reject literal non-public IPs (private, loopback, link-local, ...);
- reject hostnames whose A/AAAA records resolve to non-public IPs;
- when ``enforce_origin=True``, reject hosts that don't share the
registry's registrable domain.
Returns the canonical URL the caller should pass to ``requests``.
"""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Disallowed URL scheme: {parsed.scheme!r}. Only http/https are allowed."
),
)
try:
prepared = requests.Request("GET", url).prepare()
except (
requests.exceptions.InvalidURL,
requests.exceptions.MissingSchema,
ValueError,
) as exc:
raise ImageRegistryAuthError(
file=__file__,
message=f"Malformed URL {url!r}: {exc}",
)
canonical_url = prepared.url
canonical = urlparse(canonical_url)
host = canonical.hostname or ""
if not host:
raise ImageRegistryAuthError(
file=__file__,
message=f"URL has no host: {canonical_url}",
)
try:
addr = ipaddress.ip_address(host)
except ValueError:
try:
infos = socket.getaddrinfo(host, None)
except socket.gaierror:
infos = []
for *_, sockaddr in infos:
resolved_ip = sockaddr[0]
if _ip_is_non_public(resolved_ip):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Host {host!r} resolves to non-public address {resolved_ip}. "
"This may indicate an SSRF attempt."
),
)
else:
if any(getattr(addr, prop) for prop in _NON_PUBLIC_IP_PROPERTIES):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"URL targets a non-public address: {host}. "
"This may indicate an SSRF attempt."
),
)
if enforce_origin:
registry_host = urlparse(origin_url or self._origin_url()).hostname or ""
if registry_host and host != registry_host:
target_d = _registrable_domain(host)
registry_d = _registrable_domain(registry_host)
if not (target_d and registry_d and target_d == registry_d):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"URL host {host!r} is unrelated to registry host "
f"{registry_host!r}; refusing to follow."
),
)
return canonical_url
def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
context_label = kwargs.pop("context_label", None) or self.registry_url
kwargs.setdefault("timeout", 30)
@@ -131,16 +262,15 @@ class RegistryAdapter(ABC):
original_exception=last_exception,
)
@staticmethod
def _next_page_url(resp: requests.Response) -> str | None:
def _next_page_url(self, resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if match:
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
return f"{parsed.scheme}://{parsed.netloc}{url}"
return url
return None
if not match:
return None
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
url = f"{parsed.scheme}://{parsed.netloc}{url}"
return self._validate_outbound_url(url)
@@ -207,15 +207,14 @@ class DockerHubAdapter(RegistryAdapter):
message=f"Unexpected error during {context} on Docker Hub (HTTP {resp.status_code}): {resp.text[:200]}",
)
@staticmethod
def _next_tag_page_url(resp: requests.Response) -> str | None:
def _next_tag_page_url(self, resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if match:
next_url = match.group(1)
if next_url.startswith("/"):
return f"{_REGISTRY_HOST}{next_url}"
return next_url
return None
if not match:
return None
next_url = match.group(1)
if next_url.startswith("/"):
next_url = f"{_REGISTRY_HOST}{next_url}"
return self._validate_outbound_url(next_url, origin_url=_REGISTRY_HOST)
@@ -3,7 +3,6 @@
from __future__ import annotations
import base64
import ipaddress
import re
from typing import TYPE_CHECKING
from urllib.parse import urlparse
@@ -43,6 +42,9 @@ class OciRegistryAdapter(RegistryAdapter):
url = f"https://{url}"
return url
def _origin_url(self) -> str:
return self._base_url
def list_repositories(self) -> list[str]:
self._ensure_auth()
repositories: list[str] = []
@@ -127,8 +129,9 @@ class OciRegistryAdapter(RegistryAdapter):
file=__file__,
message=f"Cannot parse token endpoint from registry {self.registry_url}. Www-Authenticate: {www_authenticate[:200]}",
)
realm = match.group(1)
self._validate_realm_url(realm)
realm = self._validate_outbound_url(match.group(1))
if urlparse(realm).scheme == "http":
logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
params: dict = {}
service_match = re.search(r'service="([^"]+)"', www_authenticate)
if service_match:
@@ -156,27 +159,6 @@ class OciRegistryAdapter(RegistryAdapter):
)
return token
@staticmethod
def _validate_realm_url(realm: str) -> None:
parsed = urlparse(realm)
if parsed.scheme not in ("http", "https"):
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm has disallowed scheme: {parsed.scheme}. Only http/https are allowed.",
)
if parsed.scheme == "http":
logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
hostname = parsed.hostname or ""
try:
addr = ipaddress.ip_address(hostname)
if addr.is_private or addr.is_loopback or addr.is_link_local:
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm points to a private/loopback address: {hostname}. This may indicate an SSRF attempt.",
)
except ValueError:
pass
def _resolve_basic_credentials(self) -> tuple[str | None, str | None]:
"""Decode pre-encoded base64 auth tokens (e.g., from aws ecr get-authorization-token).
@@ -206,14 +188,24 @@ class OciRegistryAdapter(RegistryAdapter):
def _do_authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
if self._bearer_token:
headers["Authorization"] = f"Bearer {self._bearer_token}"
elif self.username and self.password:
user, pwd = self._resolve_basic_credentials()
kwargs.setdefault("auth", (user, pwd))
if self._is_same_origin_as_registry(url):
if self._bearer_token:
headers["Authorization"] = f"Bearer {self._bearer_token}"
elif self.username and self.password:
user, pwd = self._resolve_basic_credentials()
kwargs.setdefault("auth", (user, pwd))
kwargs["headers"] = headers
return self._request_with_retry(method, url, **kwargs)
def _is_same_origin_as_registry(self, url: str) -> bool:
target = urlparse(url)
origin = urlparse(self._base_url)
return (
target.scheme == origin.scheme
and (target.hostname or "") == (origin.hostname or "")
and target.port == origin.port
)
def _check_response(self, resp: requests.Response, context: str) -> None:
if resp.status_code == 200:
return
@@ -1,3 +1,4 @@
import socket
from unittest.mock import MagicMock, patch
import pytest
@@ -11,6 +12,18 @@ from prowler.providers.image.exceptions.exceptions import (
from prowler.providers.image.lib.registry.dockerhub_adapter import DockerHubAdapter
@pytest.fixture(autouse=True)
def _default_dns_resolves_public(monkeypatch):
"""Resolve every host to a public IP unless a test overrides this."""
def _stub(_host, *_a, **_kw):
return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))]
monkeypatch.setattr(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo", _stub
)
class TestDockerHubAdapterInit:
def test_extract_namespace_simple(self):
assert DockerHubAdapter._extract_namespace("docker.io/myorg") == "myorg"
@@ -241,3 +254,35 @@ class TestDockerHubEmptyTokens:
adapter = DockerHubAdapter("docker.io/myorg", username="u", password="p")
with pytest.raises(ImageRegistryAuthError, match="empty token"):
adapter._get_registry_token("myorg/myapp")
class TestDockerHubPaginationLinkValidator:
"""Server-controlled pagination links must be validated (PRWLRHELP-2103 class)."""
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_tag_pagination_to_metadata_ip_is_rejected(self, mock_request):
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "tok"}
tags_resp = MagicMock(
status_code=200,
headers={"Link": '<http://169.254.169.254/latest/meta-data>; rel="next"'},
)
tags_resp.json.return_value = {"tags": ["v1"]}
mock_request.side_effect = [token_resp, tags_resp]
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter.list_tags("myorg/myapp")
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_tag_pagination_to_unrelated_host_is_rejected(self, mock_request):
token_resp = MagicMock(status_code=200)
token_resp.json.return_value = {"token": "tok"}
tags_resp = MagicMock(
status_code=200,
headers={"Link": '<https://attacker.com/v2/tags?n=200>; rel="next"'},
)
tags_resp.json.return_value = {"tags": ["v1"]}
mock_request.side_effect = [token_resp, tags_resp]
adapter = DockerHubAdapter("docker.io/myorg")
with pytest.raises(ImageRegistryAuthError, match="unrelated"):
adapter.list_tags("myorg/myapp")
@@ -1,4 +1,5 @@
import base64
import socket
from unittest.mock import MagicMock, patch
import pytest
@@ -12,6 +13,35 @@ from prowler.providers.image.exceptions.exceptions import (
from prowler.providers.image.lib.registry.oci_adapter import OciRegistryAdapter
def _fake_getaddrinfo(host_to_ip: dict):
"""Build a getaddrinfo stub that resolves names from host_to_ip."""
def _stub(host, *_args, **_kwargs):
if host not in host_to_ip:
raise socket.gaierror(f"unresolved host: {host}")
ip = host_to_ip[host]
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
return [(family, socket.SOCK_STREAM, 0, "", (ip, 0))]
return _stub
@pytest.fixture(autouse=True)
def _default_dns_resolves_public(monkeypatch):
"""Make every host resolve to a public IP by default.
Individual tests may override with their own patch on
``prowler.providers.image.lib.registry.base.socket.getaddrinfo``.
"""
def _stub(_host, *_a, **_kw):
return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))]
monkeypatch.setattr(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo", _stub
)
class TestOciAdapterInit:
def test_normalise_url_adds_https(self):
adapter = OciRegistryAdapter("myregistry.io")
@@ -56,7 +86,7 @@ class TestOciAdapterAuth:
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
"Www-Authenticate": 'Bearer realm="https://auth.reg.io/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
@@ -288,70 +318,323 @@ class TestOciAdapterRetry:
class TestOciAdapterNextPageUrl:
def test_no_link_header(self):
adapter = OciRegistryAdapter("https://reg.io")
resp = MagicMock(headers={})
assert OciRegistryAdapter._next_page_url(resp) is None
assert adapter._next_page_url(resp) is None
def test_link_header_with_next(self):
adapter = OciRegistryAdapter("https://reg.io")
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200&last=b>; rel="next"'}
)
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"reg.io": "8.8.8.8"}),
):
assert (
adapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_relative_url(self):
adapter = OciRegistryAdapter("https://reg.io")
resp = MagicMock(
headers={"Link": '</v2/_catalog?n=200&last=b>; rel="next"'},
url="https://reg.io/v2/_catalog?n=200",
)
assert (
OciRegistryAdapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"reg.io": "8.8.8.8"}),
):
assert (
adapter._next_page_url(resp)
== "https://reg.io/v2/_catalog?n=200&last=b"
)
def test_link_header_no_next(self):
adapter = OciRegistryAdapter("https://reg.io")
resp = MagicMock(
headers={"Link": '<https://reg.io/v2/_catalog?n=200>; rel="prev"'}
)
assert OciRegistryAdapter._next_page_url(resp) is None
assert adapter._next_page_url(resp) is None
class TestOciAdapterSSRF:
class TestOutboundUrlValidator:
"""Centralized SSRF defense (PRWLRHELP-2103).
Layers under test:
A. Parser unification — validator and connector use the same parser.
B. DNS resolution — reject hostnames pointing to non-public IPs.
C. Same registrable-domain — reject realm/pagination on unrelated hosts.
"""
# --- A: scheme + literal IP rejection ---
def test_reject_file_scheme(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
adapter._validate_realm_url("file:///etc/passwd")
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="scheme"):
adapter._validate_outbound_url("file:///etc/passwd")
def test_reject_ftp_scheme(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="disallowed scheme"):
adapter._validate_realm_url("ftp://evil.com/token")
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="scheme"):
adapter._validate_outbound_url("ftp://reg.example.com/token")
def test_reject_private_ip(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://10.0.0.1/token")
def test_reject_private_ip_literal(self):
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url("https://10.0.0.1/token")
def test_reject_loopback(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://127.0.0.1/token")
def test_reject_loopback_ip_literal(self):
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url("https://127.0.0.1/token")
def test_reject_link_local(self):
adapter = OciRegistryAdapter("reg.io")
with pytest.raises(ImageRegistryAuthError, match="private/loopback"):
adapter._validate_realm_url("https://169.254.169.254/latest/meta-data")
def test_reject_link_local_ip_literal(self):
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url("https://169.254.169.254/latest/meta-data")
def test_accept_public_https(self):
adapter = OciRegistryAdapter("reg.io")
# Should not raise
adapter._validate_realm_url("https://auth.example.com/token")
def test_reject_ipv6_loopback(self):
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url("https://[::1]/token")
def test_accept_hostname_not_ip(self):
adapter = OciRegistryAdapter("reg.io")
# Hostnames (not IPs) should pass even if they resolve to private IPs
adapter._validate_realm_url("https://internal.corp.com/token")
# --- A: parser-mismatch bypass (the headlining PRWLRHELP-2103 PoC) ---
def test_reject_parser_mismatch_bypass(self):
"""Reporter PoC: the literal URL parses two different ways.
urlparse() sees host = 180.101.51.73 (public, would have been allowed)
requests connects to 127.0.0.1:6666 (loopback)
The validator must canonicalise via PreparedRequest and reject.
"""
adapter = OciRegistryAdapter("reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url(
"http://127.0.0.1:6666\\\\@180.101.51.73/token",
enforce_origin=False,
)
# --- B: DNS resolution to non-public IPs ---
def test_reject_hostname_resolving_to_loopback(self):
adapter = OciRegistryAdapter("https://reg.example.com")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo(
{"reg.example.com": "8.8.8.8", "localhost": "127.0.0.1"}
),
):
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url(
"https://localhost/token", enforce_origin=False
)
def test_reject_hostname_resolving_to_metadata_ip(self):
adapter = OciRegistryAdapter("https://reg.example.com")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo(
{
"reg.example.com": "8.8.8.8",
"metadata.aws.internal": "169.254.169.254",
}
),
):
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter._validate_outbound_url(
"https://metadata.aws.internal/", enforce_origin=False
)
def test_unresolvable_host_passes_validator(self):
"""getaddrinfo failure is not the validator's concern — let requests fail naturally."""
adapter = OciRegistryAdapter("https://reg.example.com")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=socket.gaierror("nope"),
):
# Same eTLD+1, unresolvable — validator should not raise.
adapter._validate_outbound_url(
"https://nx.example.com/token", enforce_origin=True
)
# --- C: same registrable-domain enforcement ---
def test_accept_same_etld1(self):
adapter = OciRegistryAdapter("https://registry-1.docker.io")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo(
{"registry-1.docker.io": "8.8.8.8", "auth.docker.io": "8.8.4.4"}
),
):
canonical = adapter._validate_outbound_url("https://auth.docker.io/token")
assert canonical == "https://auth.docker.io/token"
def test_accept_same_host(self):
adapter = OciRegistryAdapter("https://ghcr.io")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"ghcr.io": "8.8.8.8"}),
):
adapter._validate_outbound_url("https://ghcr.io/token")
def test_reject_unrelated_host(self):
adapter = OciRegistryAdapter("https://registry-1.docker.io")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo(
{"registry-1.docker.io": "8.8.8.8", "attacker.com": "1.1.1.1"}
),
):
with pytest.raises(ImageRegistryAuthError, match="unrelated"):
adapter._validate_outbound_url("https://attacker.com/token")
def test_enforce_origin_false_allows_unrelated_public_host(self):
"""When validating the registry URL itself (the trust anchor),
we don't compare it to itself — pass enforce_origin=False."""
adapter = OciRegistryAdapter("https://reg.example.com")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"public.elsewhere.io": "8.8.8.8"}),
):
adapter._validate_outbound_url(
"https://public.elsewhere.io/", enforce_origin=False
)
# --- Returns canonical URL for the caller to use ---
def test_returns_canonical_url(self):
adapter = OciRegistryAdapter("https://ghcr.io")
with patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"ghcr.io": "8.8.8.8"}),
):
canonical = adapter._validate_outbound_url("https://ghcr.io/token")
assert canonical == "https://ghcr.io/token"
class TestObtainBearerTokenAppliesValidator:
"""Integration: malicious Www-Authenticate realm must be rejected before the second call."""
@patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"reg.example.com": "8.8.8.8"}),
)
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_realm_with_parser_mismatch_payload_is_rejected(
self, mock_request, _mock_dns
):
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": (
'Bearer realm="http://127.0.0.1:6666\\\\@180.101.51.73/token",'
'service="registry"'
)
},
)
mock_request.return_value = ping_resp
adapter = OciRegistryAdapter("https://reg.example.com")
with pytest.raises(ImageRegistryAuthError):
adapter._ensure_auth()
# Only the ping should have happened — not the realm GET.
assert mock_request.call_count == 1
@patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"reg.example.com": "8.8.8.8"}),
)
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_realm_pointing_to_unrelated_host_is_rejected(
self, mock_request, _mock_dns
):
ping_resp = MagicMock(
status_code=401,
headers={"Www-Authenticate": 'Bearer realm="https://attacker.com/token"'},
)
mock_request.return_value = ping_resp
adapter = OciRegistryAdapter("https://reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="unrelated"):
adapter._ensure_auth()
assert mock_request.call_count == 1
class TestPaginationLinkValidator:
"""The Link: rel=next URL is server-controlled and must go through the validator."""
@patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"reg.example.com": "8.8.8.8"}),
)
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_oci_pagination_to_unrelated_host_is_rejected(
self, mock_request, _mock_dns
):
ping_resp = MagicMock(status_code=200)
catalog_page = MagicMock(
status_code=200,
headers={"Link": '<https://attacker.com/v2/_catalog?n=200>; rel="next"'},
)
catalog_page.json.return_value = {"repositories": ["a"]}
mock_request.side_effect = [ping_resp, catalog_page]
adapter = OciRegistryAdapter("https://reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="unrelated"):
adapter.list_repositories()
@patch(
"prowler.providers.image.lib.registry.base.socket.getaddrinfo",
side_effect=_fake_getaddrinfo({"reg.example.com": "8.8.8.8"}),
)
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_oci_pagination_to_metadata_ip_is_rejected(self, mock_request, _mock_dns):
ping_resp = MagicMock(status_code=200)
catalog_page = MagicMock(
status_code=200,
headers={"Link": '<http://169.254.169.254/latest/meta-data>; rel="next"'},
)
catalog_page.json.return_value = {"repositories": ["a"]}
mock_request.side_effect = [ping_resp, catalog_page]
adapter = OciRegistryAdapter("https://reg.example.com")
with pytest.raises(ImageRegistryAuthError, match="non-public"):
adapter.list_repositories()
class TestCrossOriginAuthorizationStrip:
"""Bearer/Basic credentials must not leak to a host different from the registry."""
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_bearer_not_sent_to_different_host(self, mock_request):
resp_200 = MagicMock(status_code=200)
mock_request.return_value = resp_200
adapter = OciRegistryAdapter("https://reg.example.com")
adapter._bearer_token = "secret-bearer"
# _do_authed_request — call with a different host
adapter._do_authed_request("GET", "https://other.example.com/v2/_catalog")
sent_headers = mock_request.call_args.kwargs.get("headers", {})
assert "Authorization" not in sent_headers
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_bearer_is_sent_to_same_host(self, mock_request):
resp_200 = MagicMock(status_code=200)
mock_request.return_value = resp_200
adapter = OciRegistryAdapter("https://reg.example.com")
adapter._bearer_token = "secret-bearer"
adapter._do_authed_request("GET", "https://reg.example.com/v2/_catalog")
sent_headers = mock_request.call_args.kwargs.get("headers", {})
assert sent_headers.get("Authorization") == "Bearer secret-bearer"
@patch("prowler.providers.image.lib.registry.base.requests.request")
def test_basic_auth_not_sent_to_different_host(self, mock_request):
resp_200 = MagicMock(status_code=200)
mock_request.return_value = resp_200
adapter = OciRegistryAdapter(
"https://reg.example.com", username="u", password="p"
)
adapter._basic_auth_verified = True
adapter._do_authed_request("GET", "https://other.example.com/v2/_catalog")
assert mock_request.call_args.kwargs.get("auth") is None
class TestOciAdapterEmptyToken:
@@ -360,7 +643,7 @@ class TestOciAdapterEmptyToken:
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
"Www-Authenticate": 'Bearer realm="https://auth.reg.io/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)
@@ -375,7 +658,7 @@ class TestOciAdapterEmptyToken:
ping_resp = MagicMock(
status_code=401,
headers={
"Www-Authenticate": 'Bearer realm="https://auth.example.com/token",service="registry"'
"Www-Authenticate": 'Bearer realm="https://auth.reg.io/token",service="registry"'
},
)
token_resp = MagicMock(status_code=200)