chore(revision): solve comments

This commit is contained in:
pedrooot
2026-06-18 13:04:44 +02:00
parent 510da0154a
commit 1673bdf0a6
12 changed files with 142 additions and 71 deletions
+7 -7
View File
@@ -40,19 +40,19 @@ When adding a new configurable check to Prowler, update the following files:
# aws.awslambda_function_vpc_multi_az
lambda_min_azs: 2
```
- **Provider Schema:** Add the typed field to the provider's Pydantic schema in `prowler/config/schema/<provider>.py`. This is required: the loader validates user configs against these schemas and the shipped `config.yaml` must round-trip with zero warnings. See [Adding a parameter to the provider schema](#adding-a-parameter-to-the-provider-schema) below.
- **Provider Schema:** Add the typed field to the provider's Pydantic schema in `prowler/config/schema/<provider>.py`. This is required: the loader validates user configs against these schemas and the shipped `config.yaml` must round-trip with zero warnings. See [Adding a Parameter to the Provider Schema](#adding-a-parameter-to-the-provider-schema) below.
- **Test Fixtures:** If tests depend on this configuration, add the variable to `tests/config/fixtures/config.yaml`.
- **Documentation:** Document the new variable in the list of configurable checks in `docs/tutorials/configuration_file.md`.
For a complete list of checks that already support configuration, see the [Configuration File Tutorial](/user-guide/cli/tutorials/configuration_file).
## Adding a parameter to the provider schema
## Adding a Parameter to the Provider Schema
Every provider has a typed Pydantic schema in `prowler/config/schema/`. When a config is loaded, `validate_provider_config` checks each user-supplied key against the schema, logs a warning, and drops any field that fails validation. The consumer's `.get(key, default)` then falls back to the built-in default.
This catches typos in a value (for example, `0.2` typed as `20`, or `"medium"` for an enum that expects `"MEDIUM"`). It does NOT catch typos in a key name: `disalowed_regions` (one `l` missing) is treated as an unknown key and passes through untouched, because third-party check plugins legitimately rely on unknown keys being preserved. Reviewers should still check that any new key the YAML adds is named exactly the same as the field on the schema.
### Where to add the field
### Where to Add the Field
1. Open `prowler/config/schema/<provider>.py` (for example, `aws.py`).
2. Add a field on the provider's schema class. Always make it `Optional[...] = None` so the absence of the key is valid.
@@ -60,7 +60,7 @@ This catches typos in a value (for example, `0.2` typed as `20`, or `"medium"` f
If you are introducing an entirely new provider rather than a new parameter, also add an entry mapping the provider name to its schema class in `prowler/config/schema/registry.py`. The loader uses that registry to find the schema for the provider it is loading.
### Choosing the right type
### Choosing the Right Type
| Value kind | Field declaration |
|---|---|
@@ -73,7 +73,7 @@ If you are introducing an entirely new provider rather than a new parameter, als
Prefer `Literal[...]` over `str` whenever the value is one of a known set. Prefer `Field(gt=0)` over `int` whenever zero or negative would be nonsensical. The point of the schema is to catch real-world mistakes that previously passed silently.
### Custom validators (only when needed)
### Custom Validators (Only When Needed)
If the value has structural rules beyond type and range, add a `field_validator`. Examples already in `aws.py`:
@@ -83,7 +83,7 @@ If the value has structural rules beyond type and range, add a `field_validator`
Raise `ValueError` from the validator. The framework converts the error into a warning and drops the offending key.
### Example: adding a new parameter
### Example: Adding a New Parameter
Say a new check needs `max_iam_role_session_hours`, a strictly positive integer that defaults to 12 in code.
@@ -105,7 +105,7 @@ Say a new check needs `max_iam_role_session_hours`, a strictly positive integer
- one test for a valid value that round-trips,
- one test for an invalid value (zero, negative, wrong type) that is dropped.
### What the loader guarantees
### What the Loader Guarantees
- **Unknown keys pass through.** Third-party check plugins can introduce arbitrary keys without schema edits; they will not be filtered.
- **Invalid values never crash the run.** They produce a single warning per field and the key is dropped.
+11 -1
View File
@@ -27,7 +27,17 @@ from prowler.config.schema.registry import SCHEMAS
def _format_loc(loc: tuple) -> str:
"""Render a Pydantic error location as `key[idx].nested`."""
"""Render a Pydantic error location as a dot-separated path.
Integer elements (array indices) are formatted as `[idx]` appended to the
previous component. String elements are joined with dots. An empty location
is rendered as `<root>`.
Examples:
("aws", "regions", 0) -> "aws.regions[0]"
("aws", "threshold") -> "aws.threshold"
() -> "<root>"
"""
parts: list[str] = []
for piece in loc:
if isinstance(piece, int):
+14 -41
View File
@@ -12,12 +12,16 @@ thresholds) and avoids ints that obviously break downstream maths
(`min_kinesis_stream_retention_hours = 99999`).
"""
from ipaddress import ip_network
from typing import Annotated, Literal, Optional
from pydantic import AfterValidator, Field
from prowler.config.schema.base import ProviderConfigBase
from prowler.config.schema.validators import (
make_dotted_version_validator,
validate_ip_networks,
validate_port_range,
)
# ---- Reusable constants -----------------------------------------------------
@@ -39,6 +43,7 @@ _CLOUDWATCH_RETENTION_DAYS = (
400,
545,
731,
1096,
1827,
2192,
2557,
@@ -63,6 +68,7 @@ _VALID_CW_RETENTION_LITERAL = Literal[
400,
545,
731,
1096,
1827,
2192,
2557,
@@ -75,13 +81,13 @@ _VALID_CW_RETENTION_LITERAL = Literal[
# ---- Custom validators ------------------------------------------------------
def _validate_port_range(v: Optional[list[int]]) -> Optional[list[int]]:
if v is None:
return v
for port in v:
if not 1 <= port <= 65535:
raise ValueError(f"port {port} is outside the valid range 1..65535")
return v
# Reusable validators shared across providers (see schema/validators.py).
_validate_port_range = validate_port_range
_validate_trusted_ips = validate_ip_networks
# "1.4.0" style strings (used by Fargate platform versions).
_validate_semver = make_dotted_version_validator(3, 3)
# "1.28" style strings (EKS minor versions).
_validate_eks_minor = make_dotted_version_validator(2, 2)
def _validate_account_ids(v: Optional[list[str]]) -> Optional[list[str]]:
@@ -95,39 +101,6 @@ def _validate_account_ids(v: Optional[list[str]]) -> Optional[list[str]]:
return v
def _validate_trusted_ips(v: Optional[list[str]]) -> Optional[list[str]]:
if v is None:
return v
for entry in v:
try:
ip_network(entry, strict=False)
except ValueError as exc:
raise ValueError(
f"trusted_ips entry {entry!r} is not a valid IP or CIDR ({exc})"
) from exc
return v
def _validate_semver(v: Optional[str]) -> Optional[str]:
"""Accept "1.4.0" style strings (used by Fargate platform versions)."""
if v is None:
return v
parts = v.split(".")
if len(parts) != 3 or not all(p.isdigit() for p in parts):
raise ValueError(f"{v!r} is not a valid semantic version (expected X.Y.Z)")
return v
def _validate_eks_minor(v: Optional[str]) -> Optional[str]:
"""Accept "1.28" style strings (EKS minor versions)."""
if v is None:
return v
parts = v.split(".")
if len(parts) != 2 or not all(p.isdigit() for p in parts):
raise ValueError(f"{v!r} is not a valid EKS version (expected X.Y)")
return v
# ---- Nested models ----------------------------------------------------------
+5 -13
View File
@@ -9,20 +9,12 @@ from typing import Annotated, Literal, Optional
from pydantic import AfterValidator, Field
from prowler.config.schema.base import ProviderConfigBase
from prowler.config.schema.validators import make_dotted_version_validator
def _validate_dotted_version(v: Optional[str]) -> Optional[str]:
"""Accept ``"8.2"``, ``"3.12"``, ``"17"`` style version strings.
Used by App Service language version fields where the upstream APIs
accept either ``MAJOR`` or ``MAJOR.MINOR`` notation.
"""
if v is None:
return v
parts = v.split(".")
if not (1 <= len(parts) <= 2) or not all(p.isdigit() for p in parts):
raise ValueError(f"{v!r} is not a valid version (expected 'X' or 'X.Y')")
return v
# Accept "8.2", "3.12", "17" style version strings. Used by App Service
# language version fields where the upstream APIs accept either MAJOR or
# MAJOR.MINOR notation.
_validate_dotted_version = make_dotted_version_validator(1, 2)
class AzureProviderConfig(ProviderConfigBase):
+6
View File
@@ -8,6 +8,12 @@ from prowler.config.schema.base import ProviderConfigBase
class CloudflareProviderConfig(ProviderConfigBase):
"""Cloudflare provider configuration schema.
Defines optional configuration parameters for Cloudflare security checks,
including API retry behavior.
"""
max_retries: Optional[int] = Field(
default=None,
ge=0,
+6
View File
@@ -8,6 +8,12 @@ from prowler.config.schema.base import ProviderConfigBase
class MongoDBAtlasProviderConfig(ProviderConfigBase):
"""MongoDB Atlas provider configuration schema.
Defines optional configuration parameters for MongoDB Atlas security checks,
including service account secret validity constraints.
"""
max_service_account_secret_validity_hours: Optional[int] = Field(
default=None,
ge=1,
+5
View File
@@ -49,6 +49,11 @@ def validate_provider_config(
)
cleaned = {k: v for k, v in raw.items() if k not in bad_keys}
# Retry validation with the cleaned dict. Dropping invalid keys handles
# common field-level mismatches, but revalidation can still fail due to
# higher-level structural constraints (e.g. nested validation errors not
# captured in the top-level bad_keys). In that case, log and return the
# cleaned dict so consumers fall back to their own defaults.
try:
model = schema_cls.model_validate(cleaned)
return model.model_dump(exclude_unset=True)
+71
View File
@@ -0,0 +1,71 @@
"""Reusable field validators shared across provider config schemas.
These are factored out so multiple providers can reuse the same validation
logic (version strings, port ranges, IP/CIDR entries) instead of duplicating
it per schema. Each validator accepts ``None`` so optional fields stay valid
when the key is absent.
"""
from ipaddress import ip_network
from typing import Callable, Optional
_VERSION_PART_LABELS = ("X", "Y", "Z", "W")
def make_dotted_version_validator(
min_parts: int, max_parts: int
) -> Callable[[Optional[str]], Optional[str]]:
"""Build a validator for dotted numeric version strings.
The returned validator accepts ``None`` and strings made of between
``min_parts`` and ``max_parts`` dot-separated numeric components. Anything
else raises ``ValueError``.
Examples:
``make_dotted_version_validator(3, 3)`` accepts ``"1.4.0"`` (semver).
``make_dotted_version_validator(2, 2)`` accepts ``"1.28"`` (EKS minor).
``make_dotted_version_validator(1, 2)`` accepts ``"17"`` or ``"8.2"``.
"""
if min_parts == max_parts:
expected = ".".join(_VERSION_PART_LABELS[:min_parts])
else:
expected = " or ".join(
f"'{'.'.join(_VERSION_PART_LABELS[:n])}'"
for n in range(min_parts, max_parts + 1)
)
def _validate(v: Optional[str]) -> Optional[str]:
if v is None:
return v
parts = v.split(".")
if not (min_parts <= len(parts) <= max_parts) or not all(
p.isdigit() for p in parts
):
raise ValueError(f"{v!r} is not a valid version (expected {expected})")
return v
return _validate
def validate_port_range(v: Optional[list[int]]) -> Optional[list[int]]:
"""Reject ports outside the valid ``1..65535`` range."""
if v is None:
return v
for port in v:
if not 1 <= port <= 65535:
raise ValueError(f"port {port} is outside the valid range 1..65535")
return v
def validate_ip_networks(v: Optional[list[str]]) -> Optional[list[str]]:
"""Reject entries that are not a valid IP address or CIDR network."""
if v is None:
return v
for entry in v:
try:
ip_network(entry, strict=False)
except ValueError as exc:
raise ValueError(
f"entry {entry!r} is not a valid IP or CIDR ({exc})"
) from exc
return v
+7
View File
@@ -8,6 +8,13 @@ from prowler.config.schema.base import ProviderConfigBase
class VercelProviderConfig(ProviderConfigBase):
"""Vercel provider configuration schema.
Defines optional configuration parameters for Vercel security checks,
including deployment branch policies, credential staleness thresholds,
RBAC ownership limits, and secret detection patterns.
"""
stable_branches: Optional[list[str]] = Field(
default=None,
description="Branches considered stable for production deployments.",
+7 -7
View File
@@ -185,14 +185,14 @@ class Test_AWS_Booleans:
def test_yaml_style_boolean_coercion(self):
# YAML can produce Python str "true"/"yes" if the user quoted it.
# Pydantic v2 will refuse string booleans by default. Verify it is
# dropped, not silently treated as True (which would be dangerous
# for verify_premium_support_plans).
# Pydantic v2 deterministically coerces "yes"/"no"/"true"/"false" to a
# real bool in lax mode, so the value is normalized rather than passed
# through as a string (which would be dangerous for
# verify_premium_support_plans).
out = _validate({"verify_premium_support_plans": "yes"})
# Pydantic actually DOES coerce "yes"/"no"/"true"/"false" in lax mode.
# We accept either outcome but require it to be a real bool.
if "verify_premium_support_plans" in out:
assert isinstance(out["verify_premium_support_plans"], bool)
assert "verify_premium_support_plans" in out
assert isinstance(out["verify_premium_support_plans"], bool)
assert out["verify_premium_support_plans"] is True
class Test_AWS_Full_Default_Config_Round_Trips:
+1 -1
View File
@@ -15,7 +15,7 @@ import pytest
from prowler.config.scan_config_schema import validate_scan_config
def _has_error_for(errors, path_substr: str) -> bool:
def _has_error_for(errors: list[dict], path_substr: str) -> bool:
return any(path_substr in e["path"] for e in errors)
@@ -5,6 +5,7 @@ through a temp YAML file. Anything that breaks here would break the actual
import logging
import os
import pathlib
from typing import Callable
import pytest
@@ -12,7 +13,7 @@ from prowler.config.config import load_and_validate_config_file
@pytest.fixture
def write_config(tmp_path):
def write_config(tmp_path: pathlib.Path) -> Callable[[str], str]:
def _write(content: str) -> str:
path = tmp_path / "config.yaml"
path.write_text(content)