Compare commits

...

4 Commits

Author SHA1 Message Date
pedrooot 510da0154a chore(merge): master 2026-06-09 17:59:00 +02:00
pedrooot 88926cc052 chore(changelog): update with latest changes 2026-06-09 16:40:20 +02:00
pedrooot ed17dc4b09 feat(sdk): scan configuration schema and validation for Prowler Cloud 2026-06-09 16:34:37 +02:00
Pepe Fagoaga 1a450aaa70 feat(config): add SDK config's validator 2026-05-13 08:04:13 +02:00
23 changed files with 2162 additions and 1 deletions
@@ -40,9 +40,76 @@ When adding a new configurable check to Prowler, update the following files:
# aws.awslambda_function_vpc_multi_az
lambda_min_azs: 2
```
- **Provider Schema:** Add the typed field to the provider's Pydantic schema in `prowler/config/schema/<provider>.py`. This is required: the loader validates user configs against these schemas and the shipped `config.yaml` must round-trip with zero warnings. See [Adding a parameter to the provider schema](#adding-a-parameter-to-the-provider-schema) below.
- **Test Fixtures:** If tests depend on this configuration, add the variable to `tests/config/fixtures/config.yaml`.
- **Documentation:** Document the new variable in the list of configurable checks in `docs/tutorials/configuration_file.md`.
For a complete list of checks that already support configuration, see the [Configuration File Tutorial](/user-guide/cli/tutorials/configuration_file).
## Adding a parameter to the provider schema
Every provider has a typed Pydantic schema in `prowler/config/schema/`. When a config is loaded, `validate_provider_config` checks each user-supplied key against the schema, logs a warning, and drops any field that fails validation. The consumer's `.get(key, default)` then falls back to the built-in default.
This catches typos in a value (for example, `0.2` typed as `20`, or `"medium"` for an enum that expects `"MEDIUM"`). It does NOT catch typos in a key name: `disalowed_regions` (one `l` missing) is treated as an unknown key and passes through untouched, because third-party check plugins legitimately rely on unknown keys being preserved. Reviewers should still check that any new key the YAML adds is named exactly the same as the field on the schema.
### Where to add the field
1. Open `prowler/config/schema/<provider>.py` (for example, `aws.py`).
2. Add a field on the provider's schema class. Always make it `Optional[...] = None` so the absence of the key is valid.
3. Apply the tightest type the value allows. Examples below.
If you are introducing an entirely new provider rather than a new parameter, also add an entry mapping the provider name to its schema class in `prowler/config/schema/registry.py`. The loader uses that registry to find the schema for the provider it is loading.
### Choosing the right type
| Value kind | Field declaration |
|---|---|
| Boolean toggle | `Optional[bool] = None` |
| Strictly positive integer (days, counts) | `Optional[int] = Field(default=None, gt=0)` |
| Fraction in 0..1 (threshold) | `Optional[float] = Field(default=None, ge=0.0, le=1.0)` |
| Closed set of strings | `Optional[Literal["A", "B", "C"]] = None` |
| Free-form string | `Optional[str] = None` |
| List of strings or ints | `Optional[list[str]] = None` |
Prefer `Literal[...]` over `str` whenever the value is one of a known set. Prefer `Field(gt=0)` over `int` whenever zero or negative would be nonsensical. The point of the schema is to catch real-world mistakes that previously passed silently.
### Custom validators (only when needed)
If the value has structural rules beyond type and range, add a `field_validator`. Examples already in `aws.py`:
- `_validate_port_range` rejects ports outside `0..65535`.
- `_validate_account_ids` rejects anything that isn't a 12-digit AWS account ID.
- `_validate_trusted_ips` rejects entries that aren't a valid IP or CIDR.
Raise `ValueError` from the validator. The framework converts the error into a warning and drops the offending key.
### Example: adding a new parameter
Say a new check needs `max_iam_role_session_hours`, a strictly positive integer that defaults to 12 in code.
1. **Schema** (`prowler/config/schema/aws.py`):
```python
# IAM
max_iam_role_session_hours: Optional[int] = Field(default=None, gt=0)
```
2. **Shipped config** (`prowler/config/config.yaml`):
```yaml
# aws.iam_role_session_duration_within_limit
max_iam_role_session_hours: 12
```
3. **Consumer** (the check):
```python
max_hours = iam_client.audit_config.get("max_iam_role_session_hours", 12)
```
4. **Tests** in `tests/config/schema/aws_schema_test.py`:
- one test for a valid value that round-trips,
- one test for an invalid value (zero, negative, wrong type) that is dropped.
### What the loader guarantees
- **Unknown keys pass through.** Third-party check plugins can introduce arbitrary keys without schema edits; they will not be filtered.
- **Invalid values never crash the run.** They produce a single warning per field and the key is dropped.
- **Coerced values are normalized.** A YAML-quoted `"180"` for an `int` field arrives downstream as the integer `180`.
- **The shipped `config.yaml` must round-trip cleanly.** The integration test `test_shipped_default_config_loads_without_warnings` will fail if a key is added to the YAML without a matching schema field, so the two stay in sync.
This approach ensures that checks are easily configurable, making Prowler highly adaptable to different environments and requirements.
+1
View File
@@ -128,6 +128,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `entra_service_principal_no_secrets_for_permanent_tier0_roles` check for M365 provider [(#10788)](https://github.com/prowler-cloud/prowler/pull/10788)
- `iam_user_access_not_stale_to_sagemaker` check for AWS provider with configurable `max_unused_sagemaker_access_days` (default 90) [(#11000)](https://github.com/prowler-cloud/prowler/pull/11000)
- `cloudtrail_bedrock_logging_enabled` check for AWS provider [(#10858)](https://github.com/prowler-cloud/prowler/pull/10858)
- Per-provider scan configuration schema with bounds validation that drops out-of-range values with a warning on config load [(#11518)](https://github.com/prowler-cloud/prowler/pull/11518)
- Okta provider with OAuth 2.0 authentication and `signon_global_session_idle_timeout_15min` check [(#11079)](https://github.com/prowler-cloud/prowler/pull/11079)
- `sagemaker_domain_sso_configured` check for AWS provider [(#11094)](https://github.com/prowler-cloud/prowler/pull/11094)
- Scaleway provider with `iam_api_keys_no_root_owned` check [(#11166)](https://github.com/prowler-cloud/prowler/pull/11166)
+10 -1
View File
@@ -288,6 +288,11 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
Returns:
dict: The configuration dictionary for the specified provider.
"""
# Imported lazily to avoid an import cycle: schemas may eventually want to
# import from prowler.config.config (e.g. for shared constants).
from prowler.config.schema.registry import SCHEMAS
from prowler.config.schema.validator import validate_provider_config
try:
with open(config_file_path, "r", encoding=encoding_format_utf_8) as f:
config_file = yaml.safe_load(f)
@@ -313,7 +318,11 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
else:
config = {}
return config
return validate_provider_config(
provider=provider,
raw=config,
schema_cls=SCHEMAS.get(provider),
)
except FileNotFoundError as error:
logger.error(
+106
View File
@@ -0,0 +1,106 @@
"""Bridge between the Pydantic-based provider schemas in
`prowler.config.schema` and the Prowler App backend (Django) + UI.
The SDK runtime is intentionally LENIENT: invalid keys are dropped with a
warning and downstream checks fall back to their defaults
(`prowler.config.schema.validator.validate_provider_config`).
The Prowler App, however, needs to surface those errors to the user when
they save a Scan Config from the UI, and to expose the schema as JSON so
the UI can validate live with `ajv`. This module provides:
- `validate_scan_config(payload)` — STRICT: returns a list of
`{path, message}` errors without silently dropping anything. The DRF
serializer (`api/.../v1/serializers.py:validate_scan_config_payload`)
turns each entry into a `ValidationError`.
- `SCAN_CONFIG_SCHEMA` — aggregated JSON Schema derived from the Pydantic
models via `model_json_schema()`. Served by the `/scan-configs/schema`
endpoint and consumed by the UI editor for in-editor live validation.
"""
from typing import Any
from pydantic import ValidationError
from prowler.config.schema.registry import SCHEMAS
def _format_loc(loc: tuple) -> str:
"""Render a Pydantic error location as `key[idx].nested`."""
parts: list[str] = []
for piece in loc:
if isinstance(piece, int):
if parts:
parts[-1] = f"{parts[-1]}[{piece}]"
else:
parts.append(f"[{piece}]")
else:
parts.append(str(piece))
return ".".join(parts) if parts else "<root>"
def validate_scan_config(payload: Any) -> list[dict]:
"""Validate a scan config payload against the registered provider schemas.
Strict by design: every Pydantic violation surfaces as a `{path, message}`
entry so the caller can decide how to present it. Unknown provider
sections are accepted (consistent with `additionalProperties: True` at
the top level — the SDK simply has no opinion on them).
"""
if not isinstance(payload, dict):
return [
{
"path": "<root>",
"message": "Scan config must be a mapping with provider sections.",
}
]
errors: list[dict] = []
for provider, section in payload.items():
schema_cls = SCHEMAS.get(provider)
if schema_cls is None:
# Unknown provider type: tolerated. The SDK will simply ignore it.
continue
if not isinstance(section, dict):
errors.append(
{
"path": str(provider),
"message": "section must be a mapping.",
}
)
continue
try:
schema_cls.model_validate(section)
except ValidationError as exc:
for err in exc.errors():
loc = err.get("loc") or ()
path = _format_loc((str(provider), *loc))
errors.append(
{
"path": path,
"message": err.get("msg", "validation error"),
}
)
return errors
def _build_aggregated_schema() -> dict:
"""Compose one JSON Schema per provider into a single top-level schema.
The output mirrors the layout of `prowler/config/config.yaml` (a mapping
keyed by provider type) and is what the UI consumes via `ajv`.
"""
properties: dict[str, dict] = {}
for provider, schema_cls in SCHEMAS.items():
properties[provider] = schema_cls.model_json_schema()
return {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Prowler Scan Config",
"type": "object",
"additionalProperties": True,
"properties": properties,
}
SCAN_CONFIG_SCHEMA: dict = _build_aggregated_schema()
View File
+449
View File
@@ -0,0 +1,449 @@
"""AWS provider config schema.
Bounds on every field are intentionally conservative: they are not the
absolute service maxima but the values that produce a useful security
check. A user is free to keep the built-in default by omitting the key —
out-of-range values are dropped with a warning at SDK runtime, and
rejected at the Prowler App backend.
Whenever an upper bound is uncertain, the cap is set to a value that
still keeps the check meaningful (e.g. a 10-year window for date-based
thresholds) and avoids ints that obviously break downstream maths
(`min_kinesis_stream_retention_hours = 99999`).
"""
from ipaddress import ip_network
from typing import Annotated, Literal, Optional
from pydantic import AfterValidator, Field
from prowler.config.schema.base import ProviderConfigBase
# ---- Reusable constants -----------------------------------------------------
# CloudWatch Logs only accepts these retention values (in days). Anything else
# is silently coerced to the next valid value by the API — we reject upfront.
_CLOUDWATCH_RETENTION_DAYS = (
1,
3,
5,
7,
14,
30,
60,
90,
120,
150,
180,
365,
400,
545,
731,
1827,
2192,
2557,
2922,
3288,
3653,
)
_VALID_CW_RETENTION_LITERAL = Literal[
1,
3,
5,
7,
14,
30,
60,
90,
120,
150,
180,
365,
400,
545,
731,
1827,
2192,
2557,
2922,
3288,
3653,
]
# ---- Custom validators ------------------------------------------------------
def _validate_port_range(v: Optional[list[int]]) -> Optional[list[int]]:
if v is None:
return v
for port in v:
if not 1 <= port <= 65535:
raise ValueError(f"port {port} is outside the valid range 1..65535")
return v
def _validate_account_ids(v: Optional[list[str]]) -> Optional[list[str]]:
if v is None:
return v
for account_id in v:
if not (account_id.isdigit() and len(account_id) == 12):
raise ValueError(
f"trusted_account_ids entry {account_id!r} is not a 12-digit AWS account id"
)
return v
def _validate_trusted_ips(v: Optional[list[str]]) -> Optional[list[str]]:
if v is None:
return v
for entry in v:
try:
ip_network(entry, strict=False)
except ValueError as exc:
raise ValueError(
f"trusted_ips entry {entry!r} is not a valid IP or CIDR ({exc})"
) from exc
return v
def _validate_semver(v: Optional[str]) -> Optional[str]:
"""Accept "1.4.0" style strings (used by Fargate platform versions)."""
if v is None:
return v
parts = v.split(".")
if len(parts) != 3 or not all(p.isdigit() for p in parts):
raise ValueError(f"{v!r} is not a valid semantic version (expected X.Y.Z)")
return v
def _validate_eks_minor(v: Optional[str]) -> Optional[str]:
"""Accept "1.28" style strings (EKS minor versions)."""
if v is None:
return v
parts = v.split(".")
if len(parts) != 2 or not all(p.isdigit() for p in parts):
raise ValueError(f"{v!r} is not a valid EKS version (expected X.Y)")
return v
# ---- Nested models ----------------------------------------------------------
class _DetectSecretsPlugin(ProviderConfigBase):
"""One entry inside ``detect_secrets_plugins``.
Only ``name`` is required by the upstream library. ``limit`` is used by
the entropy detectors. Any other plugin-specific kwarg is preserved by
the ``extra="allow"`` policy inherited from ProviderConfigBase.
"""
name: str
limit: Optional[float] = Field(
default=None,
ge=0.0,
le=10.0,
description=(
"Entropy threshold for detect-secrets entropy plugins. Range: 0..10 "
"(Shannon entropy is bounded by log2(256)=8; >10 is meaningless)."
),
)
# ---- Main schema ------------------------------------------------------------
class AWSProviderConfig(ProviderConfigBase):
# --- IAM ---------------------------------------------------------------
mute_non_default_regions: Optional[bool] = None
disallowed_regions: Optional[list[str]] = None
max_unused_access_keys_days: Optional[int] = Field(
default=None,
ge=30,
le=180,
description=(
"Days an IAM user access key can stay unused before being flagged. "
"Range: 30..180 days (CIS AWS 1.13 recommends 45; NIST IA-5 ≤90)."
),
)
max_console_access_days: Optional[int] = Field(
default=None,
ge=30,
le=180,
description=(
"Days an IAM console password can stay unused before being flagged. "
"Range: 30..180 days (CIS AWS 1.12 recommends 45)."
),
)
max_unused_sagemaker_access_days: Optional[int] = Field(
default=None,
ge=7,
le=180,
description=(
"Days a SageMaker user access key can stay unused. Range: 7..180 "
"(SageMaker tokens are usually high-privilege over S3/KMS)."
),
)
# --- EC2 ---------------------------------------------------------------
shodan_api_key: Optional[str] = Field(
default=None,
max_length=512,
description="API key for Shodan lookups on EC2 public IPs.",
)
max_security_group_rules: Optional[int] = Field(
default=None,
ge=1,
le=1000,
description="Max ingress+egress rules per security group. AWS hard limit is 1000.",
)
max_ec2_instance_age_in_days: Optional[int] = Field(
default=None,
ge=1,
le=1095,
description=(
"Days an EC2 instance can run before being flagged as old. "
"Range: 1..1095 (3 years; instances should be refreshed for patching "
"per NIST CM-3 — anything older is a security smell)."
),
)
ec2_allowed_interface_types: Optional[list[str]] = None
ec2_allowed_instance_owners: Optional[list[str]] = None
ec2_high_risk_ports: Annotated[
Optional[list[int]], AfterValidator(_validate_port_range)
] = Field(
default=None,
description="TCP/UDP ports considered high-risk when reachable from the Internet (1..65535; port 0 is reserved).",
)
# --- ECS ---------------------------------------------------------------
fargate_linux_latest_version: Annotated[
Optional[str], AfterValidator(_validate_semver)
] = Field(default=None, description="Fargate Linux platform version (X.Y.Z).")
fargate_windows_latest_version: Annotated[
Optional[str], AfterValidator(_validate_semver)
] = Field(default=None, description="Fargate Windows platform version (X.Y.Z).")
# --- Cross-account trust ----------------------------------------------
trusted_account_ids: Annotated[
Optional[list[str]], AfterValidator(_validate_account_ids)
] = Field(
default=None,
description="Additional 12-digit AWS account IDs trusted by cross-account checks.",
)
trusted_ips: Annotated[
Optional[list[str]], AfterValidator(_validate_trusted_ips)
] = Field(
default=None,
description="IPv4/IPv6 addresses or CIDR ranges that are NOT considered public.",
)
# --- CloudWatch / CloudFormation --------------------------------------
log_group_retention_days: Optional[_VALID_CW_RETENTION_LITERAL] = Field(
default=None,
description=(
"Required CloudWatch Logs retention in days. Must match one of the "
f"values accepted by the AWS API: {list(_CLOUDWATCH_RETENTION_DAYS)}."
),
)
recommended_cdk_bootstrap_version: Optional[int] = Field(
default=None,
ge=1,
le=100,
description="Min CDK bootstrap version expected on the account.",
)
# --- AppStream --------------------------------------------------------
max_idle_disconnect_timeout_in_seconds: Optional[int] = Field(
default=None,
ge=60,
le=1800,
description=(
"AppStream idle disconnect timeout (seconds). Range: 60..1800 "
"(NIST AC-12: sensitive sessions ≤15 min — cap at 30 min)."
),
)
max_disconnect_timeout_in_seconds: Optional[int] = Field(
default=None,
ge=60,
le=3600,
description="AppStream disconnect timeout (seconds). Range: 60..3600.",
)
max_session_duration_seconds: Optional[int] = Field(
default=None,
ge=600,
le=86400,
description=(
"AppStream max session duration (seconds). Range: 600..86400 "
"(10 min .. 24 h — AWS AppStream hard limit per session)."
),
)
# --- Lambda -----------------------------------------------------------
obsolete_lambda_runtimes: Optional[list[str]] = None
lambda_min_azs: Optional[int] = Field(
default=None,
ge=1,
le=6,
description="Min number of AZs a VPC-bound Lambda must span. Range: 1..6.",
)
# --- Organizations ----------------------------------------------------
organizations_enabled_regions: Optional[list[str]] = None
organizations_trusted_delegated_administrators: Annotated[
Optional[list[str]], AfterValidator(_validate_account_ids)
] = None
organizations_trusted_ids: Optional[list[str]] = None
# --- ECR --------------------------------------------------------------
ecr_repository_vulnerability_minimum_severity: Optional[
Literal["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFORMATIONAL"]
] = Field(
default=None,
description="Highest severity tolerated for ECR images.",
)
# --- Trusted Advisor --------------------------------------------------
verify_premium_support_plans: Optional[bool] = None
# --- CloudTrail threat detection: privilege escalation ----------------
threat_detection_privilege_escalation_threshold: Optional[float] = Field(
default=None,
ge=0.0,
le=1.0,
description="Fraction of suspicious actions that triggers the priv-esc detection.",
)
threat_detection_privilege_escalation_minutes: Optional[int] = Field(
default=None,
ge=5,
le=43200,
description=(
"Lookback window (minutes) for priv-esc detection. Range: 5..43200 "
"(under 5 min the signal is dominated by false positives)."
),
)
threat_detection_privilege_escalation_actions: Optional[list[str]] = None
# --- CloudTrail threat detection: enumeration -------------------------
threat_detection_enumeration_threshold: Optional[float] = Field(
default=None,
ge=0.0,
le=1.0,
description="Fraction of suspicious actions that triggers the enumeration detection.",
)
threat_detection_enumeration_minutes: Optional[int] = Field(
default=None,
ge=5,
le=43200,
description="Lookback window (minutes) for enumeration detection. Range: 5..43200.",
)
threat_detection_enumeration_actions: Optional[list[str]] = None
# --- CloudTrail threat detection: LLM jacking -------------------------
threat_detection_llm_jacking_threshold: Optional[float] = Field(
default=None,
ge=0.0,
le=1.0,
description="Fraction of suspicious actions that triggers the LLM-jacking detection.",
)
threat_detection_llm_jacking_minutes: Optional[int] = Field(
default=None,
ge=5,
le=43200,
description="Lookback window (minutes) for LLM-jacking detection. Range: 5..43200.",
)
threat_detection_llm_jacking_actions: Optional[list[str]] = None
# --- RDS --------------------------------------------------------------
check_rds_instance_replicas: Optional[bool] = None
# --- ACM --------------------------------------------------------------
days_to_expire_threshold: Optional[int] = Field(
default=None,
ge=7,
le=365,
description=(
"Days before certificate expiration to flag. Range: 7..365 "
"(PCI-DSS 4.2.1.1: alert ≥30 days before expiry; <7 days is too "
"tight to actually act on)."
),
)
insecure_key_algorithms: Optional[list[str]] = None
# --- EKS --------------------------------------------------------------
eks_required_log_types: Optional[
list[
Literal[
"api",
"audit",
"authenticator",
"controllerManager",
"scheduler",
]
]
] = Field(
default=None,
description="EKS control plane log types that must be enabled.",
)
eks_cluster_oldest_version_supported: Annotated[
Optional[str], AfterValidator(_validate_eks_minor)
] = Field(
default=None,
description='Minimum supported EKS minor version, expected as "X.Y".',
)
# --- CodeBuild --------------------------------------------------------
excluded_sensitive_environment_variables: Optional[list[str]] = None
codebuild_github_allowed_organizations: Optional[list[str]] = None
# --- ELB / ELBv2 ------------------------------------------------------
elb_min_azs: Optional[int] = Field(
default=None,
ge=1,
le=6,
description="Min AZs a Classic ELB must span. Range: 1..6.",
)
elbv2_min_azs: Optional[int] = Field(
default=None,
ge=1,
le=6,
description="Min AZs an Application/Network LB must span. Range: 1..6.",
)
# --- ElastiCache -----------------------------------------------------
minimum_snapshot_retention_period: Optional[int] = Field(
default=None,
ge=1,
le=35,
description="Days an ElastiCache backup must be retained. Range: 1..35 (service hard limit).",
)
# --- Secrets ---------------------------------------------------------
secrets_ignore_patterns: Optional[list[str]] = None
max_days_secret_unused: Optional[int] = Field(
default=None,
ge=7,
le=365,
description="Days a Secrets Manager secret can stay unused. Range: 7..365.",
)
max_days_secret_unrotated: Optional[int] = Field(
default=None,
ge=1,
le=180,
description=(
"Days a Secrets Manager secret can go without rotation. Range: 1..180 "
"(NIST IA-5: rotate quarterly; CIS recommends ≤90)."
),
)
# --- Kinesis ---------------------------------------------------------
min_kinesis_stream_retention_hours: Optional[int] = Field(
default=None,
ge=24,
le=8760,
description="Hours of Kinesis stream retention. Range: 24..8760 (1 day .. 1 year).",
)
# --- detect-secrets plugin list -------------------------------------
detect_secrets_plugins: Optional[list[_DetectSecretsPlugin]] = None
+91
View File
@@ -0,0 +1,91 @@
"""Azure provider config schema with safety bounds.
Bounds aim for values that produce a meaningful security check; out-of-range
values are dropped (SDK runtime) or rejected (Prowler App backend).
"""
from typing import Annotated, Literal, Optional
from pydantic import AfterValidator, Field
from prowler.config.schema.base import ProviderConfigBase
def _validate_dotted_version(v: Optional[str]) -> Optional[str]:
"""Accept ``"8.2"``, ``"3.12"``, ``"17"`` style version strings.
Used by App Service language version fields where the upstream APIs
accept either ``MAJOR`` or ``MAJOR.MINOR`` notation.
"""
if v is None:
return v
parts = v.split(".")
if not (1 <= len(parts) <= 2) or not all(p.isdigit() for p in parts):
raise ValueError(f"{v!r} is not a valid version (expected 'X' or 'X.Y')")
return v
class AzureProviderConfig(ProviderConfigBase):
# --- Network ---------------------------------------------------------
shodan_api_key: Optional[str] = Field(
default=None,
max_length=512,
description="API key for Shodan lookups on Azure public IPs.",
)
# --- Defender --------------------------------------------------------
defender_attack_path_minimal_risk_level: Optional[
Literal["Low", "Medium", "High", "Critical"]
] = Field(
default=None,
description="Minimum attack-path risk level worth a notification.",
)
# --- App Service ----------------------------------------------------
php_latest_version: Annotated[
Optional[str], AfterValidator(_validate_dotted_version)
] = Field(default=None, description='PHP minimum acceptable version, e.g. "8.2".')
python_latest_version: Annotated[
Optional[str], AfterValidator(_validate_dotted_version)
] = Field(
default=None, description='Python minimum acceptable version, e.g. "3.12".'
)
java_latest_version: Annotated[
Optional[str], AfterValidator(_validate_dotted_version)
] = Field(default=None, description='Java minimum acceptable version, e.g. "17".')
# --- SQL ------------------------------------------------------------
recommended_minimal_tls_versions: Optional[list[Literal["1.2", "1.3"]]] = Field(
default=None,
description="TLS versions accepted on Azure SQL Server.",
)
# --- Virtual Machines -----------------------------------------------
desired_vm_sku_sizes: Optional[list[str]] = None
vm_backup_min_daily_retention_days: Optional[int] = Field(
default=None,
ge=7,
le=9999,
description=(
"Min daily backup retention days. Range: 7..9999 "
"(Azure Backup hard limit; <7 days defeats DR/ransomware recovery)."
),
)
# --- API Management threat detection (LLM jacking) -----------------
apim_threat_detection_llm_jacking_threshold: Optional[float] = Field(
default=None,
ge=0.0,
le=1.0,
description="Fraction of suspicious actions that triggers the detection.",
)
apim_threat_detection_llm_jacking_minutes: Optional[int] = Field(
default=None,
ge=5,
le=43200,
description=(
"Lookback window (minutes) for LLM-jacking detection. Range: 5..43200 "
"(under 5 min the signal is dominated by false positives)."
),
)
apim_threat_detection_llm_jacking_actions: Optional[list[str]] = None
+17
View File
@@ -0,0 +1,17 @@
from pydantic import BaseModel, ConfigDict
class ProviderConfigBase(BaseModel):
"""Base for every provider config schema.
``extra="allow"`` is REQUIRED for backwards compatibility: third-party
check plugins frequently introduce config keys we do not know about,
and pre-existing user configs may carry deprecated keys. Validation
must never reject these.
"""
model_config = ConfigDict(
extra="allow",
str_strip_whitespace=True,
validate_assignment=False,
)
+18
View File
@@ -0,0 +1,18 @@
"""Cloudflare provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class CloudflareProviderConfig(ProviderConfigBase):
max_retries: Optional[int] = Field(
default=None,
ge=0,
le=10,
description=(
"Max retries for Cloudflare API requests. Range: 0..10 (0 disables retries)."
),
)
+45
View File
@@ -0,0 +1,45 @@
"""GCP provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class GCPProviderConfig(ProviderConfigBase):
shodan_api_key: Optional[str] = Field(
default=None,
max_length=512,
description="API key for Shodan lookups on GCP public IPs.",
)
mig_min_zones: Optional[int] = Field(
default=None,
ge=1,
le=5,
description="Min zones a Managed Instance Group must span. Range: 1..5.",
)
max_snapshot_age_days: Optional[int] = Field(
default=None,
ge=1,
le=1095,
description=(
"Days a disk snapshot can age before being flagged. Range: 1..1095 "
"(3 years; older snapshots typically miss data-class compliance)."
),
)
max_unused_account_days: Optional[int] = Field(
default=None,
ge=30,
le=365,
description=(
"Days a service account or user-managed key can stay unused. "
"Range: 30..365."
),
)
storage_min_retention_days: Optional[int] = Field(
default=None,
ge=1,
le=3650,
description="Min retention period on Cloud Storage buckets. Range: 1..3650.",
)
+20
View File
@@ -0,0 +1,20 @@
"""GitHub provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class GitHubProviderConfig(ProviderConfigBase):
inactive_not_archived_days_threshold: Optional[int] = Field(
default=None,
ge=30,
le=3650,
description=(
"Days a repository can stay inactive without being archived before "
"being flagged. Range: 30..3650 (CIS GitHub recommends 180; "
"<30 days produces false positives on seasonal projects)."
),
)
+45
View File
@@ -0,0 +1,45 @@
"""Kubernetes provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class KubernetesProviderConfig(ProviderConfigBase):
audit_log_maxbackup: Optional[int] = Field(
default=None,
ge=2,
le=1000,
description=(
"API server audit log file rotations to keep. Range: 2..1000 "
"(CIS Kubernetes 1.2.18 recommends ≥10)."
),
)
audit_log_maxsize: Optional[int] = Field(
default=None,
ge=10,
le=10000,
description=(
"Max MB per audit log file before rotation. Range: 10..10000 MB "
"(CIS Kubernetes 1.2.19 recommends ≥100 MB)."
),
)
audit_log_maxage: Optional[int] = Field(
default=None,
ge=7,
le=3650,
description=(
"Days an audit log file is retained. Range: 7..3650 "
"(CIS Kubernetes 1.2.17 recommends ≥30 days)."
),
)
apiserver_strong_ciphers: Optional[list[str]] = Field(
default=None,
description="Whitelist of strong TLS cipher suites required on the API server.",
)
kubelet_strong_ciphers: Optional[list[str]] = Field(
default=None,
description="Whitelist of strong TLS cipher suites required on kubelet.",
)
+54
View File
@@ -0,0 +1,54 @@
"""M365 provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class M365ProviderConfig(ProviderConfigBase):
# --- Entra (sign-in policy) ----------------------------------------
sign_in_frequency: Optional[int] = Field(
default=None,
ge=1,
le=168,
description=(
"Hours between forced sign-ins for admin users. Range: 1..168 (1 h .. 7 days). "
"Microsoft Conditional Access baseline for admin roles is ≤24 h."
),
)
# --- Teams ---------------------------------------------------------
allowed_cloud_storage_services: Optional[list[str]] = Field(
default=None,
description="External cloud storage services allowed in Teams.",
)
# --- Exchange ------------------------------------------------------
recommended_mailtips_large_audience_threshold: Optional[int] = Field(
default=None,
ge=5,
le=10000,
description=(
"Recipient count that should trigger a 'large audience' MailTip. "
"Range: 5..10000 (Microsoft default 25)."
),
)
# --- Defender malware policy --------------------------------------
default_recommended_extensions: Optional[list[str]] = Field(
default=None,
description="File extensions blocked by the malware policy.",
)
# --- Mailbox auditing ---------------------------------------------
audit_log_age: Optional[int] = Field(
default=None,
ge=30,
le=3650,
description=(
"Days mailbox audit logs must be retained. Range: 30..3650 "
"(M365 E3 default is 90 days; SEC/FINRA require ≥7 years)."
),
)
+19
View File
@@ -0,0 +1,19 @@
"""MongoDB Atlas provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class MongoDBAtlasProviderConfig(ProviderConfigBase):
max_service_account_secret_validity_hours: Optional[int] = Field(
default=None,
ge=1,
le=720,
description=(
"Max hours a service account secret can stay valid. "
"Range: 1..720 (1 h .. 30 days)."
),
)
+28
View File
@@ -0,0 +1,28 @@
"""Mapping of provider name to its Pydantic schema class.
Kept in its own module so the validator stays free of provider-schema imports
and callers pay the import cost only when they actually need the registry.
"""
from prowler.config.schema.aws import AWSProviderConfig
from prowler.config.schema.azure import AzureProviderConfig
from prowler.config.schema.base import ProviderConfigBase
from prowler.config.schema.cloudflare import CloudflareProviderConfig
from prowler.config.schema.gcp import GCPProviderConfig
from prowler.config.schema.github import GitHubProviderConfig
from prowler.config.schema.kubernetes import KubernetesProviderConfig
from prowler.config.schema.m365 import M365ProviderConfig
from prowler.config.schema.mongodbatlas import MongoDBAtlasProviderConfig
from prowler.config.schema.vercel import VercelProviderConfig
SCHEMAS: dict[str, type[ProviderConfigBase]] = {
"aws": AWSProviderConfig,
"azure": AzureProviderConfig,
"gcp": GCPProviderConfig,
"kubernetes": KubernetesProviderConfig,
"m365": M365ProviderConfig,
"github": GitHubProviderConfig,
"mongodbatlas": MongoDBAtlasProviderConfig,
"cloudflare": CloudflareProviderConfig,
"vercel": VercelProviderConfig,
}
+61
View File
@@ -0,0 +1,61 @@
from typing import Any
from pydantic import ValidationError
from prowler.config.schema.base import ProviderConfigBase
from prowler.lib.logger import logger
def validate_provider_config(
provider: str,
raw: Any,
schema_cls: type[ProviderConfigBase] | None,
) -> dict:
"""Validate a provider's config dict against its Pydantic schema.
Behavior is intentionally lenient to preserve backwards compatibility:
- If ``raw`` is not a dict, return an empty dict (mirrors prior loader).
- If no schema is registered for ``provider``, return ``raw`` untouched.
- On validation errors, log one WARNING per offending field, DROP those
keys from the result, and continue. Consumers fall back to their own
hard-coded defaults via ``audit_config.get(key, default)``.
- Coerced values (e.g. ``"180"`` -> ``180``) replace the user's input
so that downstream checks never receive a wrongly-typed value.
"""
if not isinstance(raw, dict):
return {}
if schema_cls is None:
return raw
try:
model = schema_cls.model_validate(raw)
return model.model_dump(exclude_unset=True)
except ValidationError as exc:
bad_keys: set[str] = set()
for err in exc.errors():
loc = err.get("loc") or ()
if not loc:
continue
key = loc[0]
if not isinstance(key, str):
continue
bad_keys.add(key)
logger.warning(
f"prowler.config[{provider}.{key}] = {raw.get(key)!r} is invalid "
f"({err.get('msg', 'validation error')}); the value will be ignored "
f"and the built-in default will be used."
)
cleaned = {k: v for k, v in raw.items() if k not in bad_keys}
try:
model = schema_cls.model_validate(cleaned)
return model.model_dump(exclude_unset=True)
except ValidationError as exc2:
logger.error(
f"prowler.config[{provider}] could not be revalidated after dropping "
f"invalid keys ({bad_keys}); passing through the cleaned dict as-is. "
f"Underlying errors: {exc2.errors()}"
)
return cleaned
+61
View File
@@ -0,0 +1,61 @@
"""Vercel provider config schema with safety bounds."""
from typing import Optional
from pydantic import Field
from prowler.config.schema.base import ProviderConfigBase
class VercelProviderConfig(ProviderConfigBase):
stable_branches: Optional[list[str]] = Field(
default=None,
description="Branches considered stable for production deployments.",
)
days_to_expire_threshold: Optional[int] = Field(
default=None,
ge=7,
le=365,
description=(
"Days before token/certificate expiration to flag. Range: 7..365 "
"(PCI-DSS 4.2.1.1: alert ≥30 days before expiry)."
),
)
stale_token_threshold_days: Optional[int] = Field(
default=None,
ge=30,
le=3650,
description=(
"Days of inactivity before a token is considered stale. Range: 30..3650 "
"(NIST AC-2(3) typical window 30..90 days)."
),
)
stale_invitation_threshold_days: Optional[int] = Field(
default=None,
ge=7,
le=365,
description=(
"Days a pending invitation can stay open. Range: 7..365 "
"(OWASP ASVS 2.7.1 recommends short-lived invitations)."
),
)
max_owner_percentage: Optional[int] = Field(
default=None,
ge=1,
le=50,
description=(
"Max percentage of team members that can have the OWNER role. "
"Range: 1..50 (PoLP — having >50% of a team as OWNER defeats RBAC; "
"industry guidance recommends ≤25%)."
),
)
max_owners: Optional[int] = Field(
default=None,
ge=1,
le=1000,
description="Absolute max owners (overrides percentage for large teams). Range: 1..1000.",
)
secret_suffixes: Optional[list[str]] = Field(
default=None,
description="Suffixes that mark a project env var as secret-like.",
)
View File
+222
View File
@@ -0,0 +1,222 @@
"""AWS-specific schema coverage — the biggest provider, with the richest
constraint surface (CIDRs, account IDs, port ranges, enums, thresholds)."""
import pytest
from prowler.config.schema.aws import AWSProviderConfig
from prowler.config.schema.validator import validate_provider_config
def _validate(raw):
return validate_provider_config("aws", raw, AWSProviderConfig)
class Test_AWS_Threat_Detection_Thresholds:
"""All threat detection thresholds are documented as fractions in 0..1.
The biggest risk of mistyping them is silently disabling the check."""
@pytest.mark.parametrize(
"key",
[
"threat_detection_privilege_escalation_threshold",
"threat_detection_enumeration_threshold",
"threat_detection_llm_jacking_threshold",
],
)
def test_valid_boundary_values(self, key):
assert _validate({key: 0.0}) == {key: 0.0}
assert _validate({key: 1.0}) == {key: 1.0}
assert _validate({key: 0.5}) == {key: 0.5}
@pytest.mark.parametrize(
"key",
[
"threat_detection_privilege_escalation_threshold",
"threat_detection_enumeration_threshold",
"threat_detection_llm_jacking_threshold",
],
)
def test_invalid_values_are_dropped(self, key):
# 20 instead of 0.2 — would never trigger
assert _validate({key: 20}) == {}
# negative
assert _validate({key: -0.1}) == {}
# string
assert _validate({key: "high"}) == {}
class Test_AWS_Trusted_Account_Ids:
def test_valid_twelve_digit_ids(self):
ids = ["123456789012", "098765432109"]
assert _validate({"trusted_account_ids": ids}) == {"trusted_account_ids": ids}
def test_empty_list_is_valid(self):
assert _validate({"trusted_account_ids": []}) == {"trusted_account_ids": []}
def test_short_id_is_dropped(self):
assert _validate({"trusted_account_ids": ["12345"]}) == {}
def test_non_numeric_id_is_dropped(self):
assert _validate({"trusted_account_ids": ["1234abcd5678"]}) == {}
def test_id_with_dashes_is_dropped(self):
# Some users format account IDs as "1234-5678-9012"
assert _validate({"trusted_account_ids": ["1234-5678-9012"]}) == {}
class Test_AWS_Trusted_Ips:
def test_single_ipv4_address(self):
assert _validate({"trusted_ips": ["1.2.3.4"]}) == {"trusted_ips": ["1.2.3.4"]}
def test_ipv4_cidr(self):
assert _validate({"trusted_ips": ["10.0.0.0/8"]}) == {
"trusted_ips": ["10.0.0.0/8"]
}
def test_ipv6_address(self):
assert _validate({"trusted_ips": ["2001:db8::1"]}) == {
"trusted_ips": ["2001:db8::1"]
}
def test_ipv6_cidr(self):
assert _validate({"trusted_ips": ["2001:db8::/32"]}) == {
"trusted_ips": ["2001:db8::/32"]
}
def test_mixed_list(self):
ips = ["1.2.3.4", "10.0.0.0/8", "2001:db8::1"]
assert _validate({"trusted_ips": ips}) == {"trusted_ips": ips}
def test_garbage_entry_is_dropped(self):
assert _validate({"trusted_ips": ["definitely-not-an-ip"]}) == {}
def test_cidr_with_host_bits_is_accepted(self):
# We use strict=False so "10.0.0.5/8" is accepted. This matches the
# behaviour of most security tools and avoids surprising users who
# paste real-world allowlists with non-canonical CIDR notation.
assert _validate({"trusted_ips": ["10.0.0.5/8"]}) == {
"trusted_ips": ["10.0.0.5/8"]
}
class Test_AWS_Ports:
def test_valid_ports_in_range(self):
ports = [25, 80, 443, 65535, 1]
assert _validate({"ec2_high_risk_ports": ports}) == {
"ec2_high_risk_ports": ports
}
def test_port_zero_is_dropped(self):
# Port 0 is reserved and not a valid security signal.
assert _validate({"ec2_high_risk_ports": [0]}) == {}
def test_out_of_range_port_is_dropped(self):
assert _validate({"ec2_high_risk_ports": [70000]}) == {}
def test_negative_port_is_dropped(self):
assert _validate({"ec2_high_risk_ports": [-1]}) == {}
class Test_AWS_Enums:
@pytest.mark.parametrize("level", ["CRITICAL", "HIGH", "MEDIUM", "LOW"])
def test_valid_severity_levels(self, level):
assert _validate({"ecr_repository_vulnerability_minimum_severity": level}) == {
"ecr_repository_vulnerability_minimum_severity": level
}
@pytest.mark.parametrize("level", ["critical", "Medium", "ANY", "", "X"])
def test_invalid_severity_levels_are_dropped(self, level):
assert _validate({"ecr_repository_vulnerability_minimum_severity": level}) == {}
class Test_AWS_Detect_Secrets_Plugins:
def test_plugin_without_limit(self):
out = _validate({"detect_secrets_plugins": [{"name": "AWSKeyDetector"}]})
assert out == {"detect_secrets_plugins": [{"name": "AWSKeyDetector"}]}
def test_plugin_with_limit(self):
out = _validate(
{
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": 6.0}
]
}
)
assert out == {
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": 6.0}
]
}
def test_plugin_missing_name_drops_whole_field(self):
# ``name`` is required by the upstream library.
out = _validate({"detect_secrets_plugins": [{"limit": 6.0}]})
assert out == {}
def test_extra_plugin_kwargs_pass_through(self):
# Plugins can have arbitrary extra params (extra="allow" on the
# nested model). They must round-trip.
out = _validate(
{
"detect_secrets_plugins": [
{"name": "Custom", "my_param": "abc", "other": 42}
]
}
)
assert out == {
"detect_secrets_plugins": [
{"name": "Custom", "my_param": "abc", "other": 42}
]
}
class Test_AWS_Booleans:
@pytest.mark.parametrize(
"key",
[
"mute_non_default_regions",
"verify_premium_support_plans",
"check_rds_instance_replicas",
],
)
def test_true_and_false_round_trip(self, key):
assert _validate({key: True}) == {key: True}
assert _validate({key: False}) == {key: False}
def test_yaml_style_boolean_coercion(self):
# YAML can produce Python str "true"/"yes" if the user quoted it.
# Pydantic v2 will refuse string booleans by default. Verify it is
# dropped, not silently treated as True (which would be dangerous
# for verify_premium_support_plans).
out = _validate({"verify_premium_support_plans": "yes"})
# Pydantic actually DOES coerce "yes"/"no"/"true"/"false" in lax mode.
# We accept either outcome but require it to be a real bool.
if "verify_premium_support_plans" in out:
assert isinstance(out["verify_premium_support_plans"], bool)
class Test_AWS_Full_Default_Config_Round_Trips:
"""Loading the real shipped defaults through the schema must produce
exactly the same dict. This is the regression sentinel for backwards
compatibility."""
def test_full_default_config_round_trip(self):
# Subset that mirrors the shipped config.yaml semantics.
raw = {
"mute_non_default_regions": False,
"disallowed_regions": ["me-south-1", "me-central-1"],
"max_unused_access_keys_days": 45,
"max_ec2_instance_age_in_days": 180,
"trusted_account_ids": [],
"trusted_ips": [],
"ecr_repository_vulnerability_minimum_severity": "MEDIUM",
"threat_detection_privilege_escalation_threshold": 0.2,
"threat_detection_enumeration_threshold": 0.3,
"threat_detection_llm_jacking_threshold": 0.4,
"ec2_high_risk_ports": [25, 110, 8088],
"detect_secrets_plugins": [
{"name": "AWSKeyDetector"},
{"name": "Base64HighEntropyString", "limit": 6.0},
],
}
assert _validate(raw) == raw
+398
View File
@@ -0,0 +1,398 @@
"""Boundary tests for the safety bounds added on top of the upstream schemas.
Each parametrised case checks (a) the min and max values are accepted and
(b) one step outside the range is rejected. Custom validators (semver,
EKS minor, dotted version, port range, account IDs, IPs) get focused
positive/negative tests.
Tests use the public adapter ``prowler.config.scan_config_schema``: a
schema violation surfaces as a list of ``{"path", "message"}`` entries.
This keeps the contract the Prowler App backend depends on under test.
"""
import pytest
from prowler.config.scan_config_schema import validate_scan_config
def _has_error_for(errors, path_substr: str) -> bool:
return any(path_substr in e["path"] for e in errors)
# Each tuple: (provider, key, min_allowed, max_allowed)
INT_BOUND_CASES = [
# AWS
("aws", "max_unused_access_keys_days", 30, 180),
("aws", "max_console_access_days", 30, 180),
("aws", "max_unused_sagemaker_access_days", 7, 180),
("aws", "max_security_group_rules", 1, 1000),
("aws", "max_ec2_instance_age_in_days", 1, 1095),
("aws", "recommended_cdk_bootstrap_version", 1, 100),
("aws", "max_idle_disconnect_timeout_in_seconds", 60, 1800),
("aws", "max_disconnect_timeout_in_seconds", 60, 3600),
("aws", "max_session_duration_seconds", 600, 86400),
("aws", "lambda_min_azs", 1, 6),
("aws", "threat_detection_privilege_escalation_minutes", 5, 43200),
("aws", "threat_detection_enumeration_minutes", 5, 43200),
("aws", "threat_detection_llm_jacking_minutes", 5, 43200),
("aws", "days_to_expire_threshold", 7, 365),
("aws", "elb_min_azs", 1, 6),
("aws", "elbv2_min_azs", 1, 6),
("aws", "minimum_snapshot_retention_period", 1, 35),
("aws", "max_days_secret_unused", 7, 365),
("aws", "max_days_secret_unrotated", 1, 180),
("aws", "min_kinesis_stream_retention_hours", 24, 8760),
# Azure
("azure", "vm_backup_min_daily_retention_days", 7, 9999),
("azure", "apim_threat_detection_llm_jacking_minutes", 5, 43200),
# GCP
("gcp", "mig_min_zones", 1, 5),
("gcp", "max_snapshot_age_days", 1, 1095),
("gcp", "max_unused_account_days", 30, 365),
("gcp", "storage_min_retention_days", 1, 3650),
# Kubernetes
("kubernetes", "audit_log_maxbackup", 2, 1000),
("kubernetes", "audit_log_maxsize", 10, 10000),
("kubernetes", "audit_log_maxage", 7, 3650),
# M365
("m365", "sign_in_frequency", 1, 168),
("m365", "recommended_mailtips_large_audience_threshold", 5, 10000),
("m365", "audit_log_age", 30, 3650),
# GitHub
("github", "inactive_not_archived_days_threshold", 30, 3650),
# MongoDB Atlas
("mongodbatlas", "max_service_account_secret_validity_hours", 1, 720),
# Cloudflare
("cloudflare", "max_retries", 0, 10),
# Vercel
("vercel", "days_to_expire_threshold", 7, 365),
("vercel", "stale_token_threshold_days", 30, 3650),
("vercel", "stale_invitation_threshold_days", 7, 365),
("vercel", "max_owner_percentage", 1, 50),
("vercel", "max_owners", 1, 1000),
]
FLOAT_THRESHOLD_FIELDS = [
("aws", "threat_detection_privilege_escalation_threshold"),
("aws", "threat_detection_enumeration_threshold"),
("aws", "threat_detection_llm_jacking_threshold"),
("azure", "apim_threat_detection_llm_jacking_threshold"),
]
class TestIntegerBounds:
"""Each int field accepts both ends of its range and rejects ±1 outside."""
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_min_accepted(self, provider, key, lo, hi):
assert validate_scan_config({provider: {key: lo}}) == []
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_max_accepted(self, provider, key, lo, hi):
assert validate_scan_config({provider: {key: hi}}) == []
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_below_min_rejected(self, provider, key, lo, hi):
errors = validate_scan_config({provider: {key: lo - 1}})
assert _has_error_for(errors, f"{provider}.{key}"), errors
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_above_max_rejected(self, provider, key, lo, hi):
errors = validate_scan_config({provider: {key: hi + 1}})
assert _has_error_for(errors, f"{provider}.{key}"), errors
class TestFloatThresholds:
"""Threshold floats must stay within 0..1 inclusive."""
@pytest.mark.parametrize("provider, key", FLOAT_THRESHOLD_FIELDS)
def test_zero_and_one_accepted(self, provider, key):
assert validate_scan_config({provider: {key: 0.0}}) == []
assert validate_scan_config({provider: {key: 1.0}}) == []
assert validate_scan_config({provider: {key: 0.5}}) == []
@pytest.mark.parametrize("provider, key", FLOAT_THRESHOLD_FIELDS)
def test_negative_rejected(self, provider, key):
errors = validate_scan_config({provider: {key: -0.01}})
assert _has_error_for(errors, f"{provider}.{key}")
@pytest.mark.parametrize("provider, key", FLOAT_THRESHOLD_FIELDS)
def test_above_one_rejected(self, provider, key):
errors = validate_scan_config({provider: {key: 1.01}})
assert _has_error_for(errors, f"{provider}.{key}")
class TestCloudWatchRetention:
"""`log_group_retention_days` only accepts the AWS-approved enum values."""
@pytest.mark.parametrize("value", [1, 7, 30, 365, 731, 3653])
def test_valid_values_accepted(self, value):
assert validate_scan_config({"aws": {"log_group_retention_days": value}}) == []
@pytest.mark.parametrize("value", [0, 2, 42, 500, 999, 4000])
def test_invalid_values_rejected(self, value):
errors = validate_scan_config({"aws": {"log_group_retention_days": value}})
assert _has_error_for(errors, "aws.log_group_retention_days")
class TestSemverValidator:
"""AWS Fargate platform versions: X.Y.Z."""
@pytest.mark.parametrize("value", ["1.4.0", "1.0.0", "0.0.1", "10.20.30"])
def test_accepts_semver(self, value):
assert (
validate_scan_config({"aws": {"fargate_linux_latest_version": value}}) == []
)
@pytest.mark.parametrize("value", ["1.4", "1", "v1.4.0", "1.4.0-beta", "a.b.c", ""])
def test_rejects_non_semver(self, value):
errors = validate_scan_config({"aws": {"fargate_linux_latest_version": value}})
assert _has_error_for(errors, "aws.fargate_linux_latest_version")
class TestEksVersionValidator:
"""`eks_cluster_oldest_version_supported` expects MAJOR.MINOR."""
@pytest.mark.parametrize("value", ["1.28", "1.29", "1.30", "2.0"])
def test_accepts_minor(self, value):
assert (
validate_scan_config(
{"aws": {"eks_cluster_oldest_version_supported": value}}
)
== []
)
@pytest.mark.parametrize("value", ["1.28.0", "v1.28", "1", "1.x", ""])
def test_rejects_invalid(self, value):
errors = validate_scan_config(
{"aws": {"eks_cluster_oldest_version_supported": value}}
)
assert _has_error_for(errors, "aws.eks_cluster_oldest_version_supported")
class TestEksLogTypesEnum:
"""Only the documented log types are accepted."""
def test_full_enum_accepted(self):
assert (
validate_scan_config(
{
"aws": {
"eks_required_log_types": [
"api",
"audit",
"authenticator",
"controllerManager",
"scheduler",
]
}
}
)
== []
)
def test_unknown_type_rejected(self):
errors = validate_scan_config(
{"aws": {"eks_required_log_types": ["api", "telemetry"]}}
)
assert _has_error_for(errors, "aws.eks_required_log_types")
class TestAzureDottedVersion:
"""App Service versions accept 'X' and 'X.Y' but not 'X.Y.Z' or junk."""
@pytest.mark.parametrize("value", ["8.2", "3.12", "17"])
def test_accepts(self, value):
assert validate_scan_config({"azure": {"php_latest_version": value}}) == []
assert validate_scan_config({"azure": {"python_latest_version": value}}) == []
assert validate_scan_config({"azure": {"java_latest_version": value}}) == []
@pytest.mark.parametrize("value", ["8.2.0", "v8", "8.x", ""])
def test_rejects(self, value):
errors = validate_scan_config({"azure": {"php_latest_version": value}})
assert _has_error_for(errors, "azure.php_latest_version")
class TestAzureTlsLiteralEnum:
"""Only TLS 1.2 and 1.3 are tolerated by the recommended list."""
def test_accepted_versions(self):
assert (
validate_scan_config(
{"azure": {"recommended_minimal_tls_versions": ["1.2", "1.3"]}}
)
== []
)
@pytest.mark.parametrize("value", ["1.0", "1.1", "2.0", ""])
def test_unknown_version_rejected(self, value):
errors = validate_scan_config(
{"azure": {"recommended_minimal_tls_versions": [value]}}
)
assert _has_error_for(errors, "azure.recommended_minimal_tls_versions")
class TestAzureRiskLevelLiteral:
"""Defender attack-path risk level is a closed enum."""
@pytest.mark.parametrize("value", ["Low", "Medium", "High", "Critical"])
def test_accepted(self, value):
assert (
validate_scan_config(
{"azure": {"defender_attack_path_minimal_risk_level": value}}
)
== []
)
@pytest.mark.parametrize("value", ["low", "CRITICAL", "Severe", ""])
def test_rejected(self, value):
errors = validate_scan_config(
{"azure": {"defender_attack_path_minimal_risk_level": value}}
)
assert _has_error_for(errors, "azure.defender_attack_path_minimal_risk_level")
class TestECRSeverityLiteral:
"""ECR severity is a closed enum (with INFORMATIONAL allowed)."""
@pytest.mark.parametrize(
"value",
["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFORMATIONAL"],
)
def test_accepted(self, value):
assert (
validate_scan_config(
{"aws": {"ecr_repository_vulnerability_minimum_severity": value}}
)
== []
)
@pytest.mark.parametrize("value", ["URGENT", "low", "Crit", ""])
def test_rejected(self, value):
errors = validate_scan_config(
{"aws": {"ecr_repository_vulnerability_minimum_severity": value}}
)
assert _has_error_for(
errors, "aws.ecr_repository_vulnerability_minimum_severity"
)
class TestPortRangeValidator:
"""Each entry of `ec2_high_risk_ports` must be 1..65535 (0 is reserved)."""
def test_valid_ports(self):
assert (
validate_scan_config({"aws": {"ec2_high_risk_ports": [1, 22, 8080, 65535]}})
== []
)
@pytest.mark.parametrize("value", [-1, 0, 65536, 99999])
def test_invalid_port_rejected(self, value):
errors = validate_scan_config({"aws": {"ec2_high_risk_ports": [80, value]}})
assert _has_error_for(errors, "aws.ec2_high_risk_ports")
class TestAccountIdsValidator:
"""AWS account IDs are 12-digit strings."""
def test_valid(self):
assert (
validate_scan_config(
{"aws": {"trusted_account_ids": ["123456789012", "098765432109"]}}
)
== []
)
@pytest.mark.parametrize(
"value", ["12345", "12345678901", "1234567890123", "12345678901a"]
)
def test_invalid_rejected(self, value):
errors = validate_scan_config({"aws": {"trusted_account_ids": [value]}})
assert _has_error_for(errors, "aws.trusted_account_ids")
class TestTrustedIpsValidator:
"""Trusted IPs accept IPv4, IPv6, and CIDR; reject junk."""
@pytest.mark.parametrize(
"value",
["1.2.3.4", "10.0.0.0/8", "2001:db8::1", "2001:db8::/32"],
)
def test_valid(self, value):
assert validate_scan_config({"aws": {"trusted_ips": [value]}}) == []
@pytest.mark.parametrize(
"value", ["not.an.ip", "1.2.3.300", "10.0.0.0/40", "::ffff:::"]
)
def test_invalid_rejected(self, value):
errors = validate_scan_config({"aws": {"trusted_ips": [value]}})
assert _has_error_for(errors, "aws.trusted_ips")
class TestDetectSecretsEntropyBound:
"""`detect_secrets_plugins[].limit` is Shannon entropy: 0..10."""
@pytest.mark.parametrize("value", [0.0, 3.5, 4.5, 8.0, 10.0])
def test_valid(self, value):
assert (
validate_scan_config(
{
"aws": {
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": value}
]
}
}
)
== []
)
@pytest.mark.parametrize("value", [-0.1, 10.01, 50])
def test_invalid(self, value):
errors = validate_scan_config(
{
"aws": {
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": value}
]
}
}
)
assert _has_error_for(errors, "aws.detect_secrets_plugins")
class TestAdapterRobustness:
"""Top-level adapter behaviour the Prowler App backend depends on."""
def test_non_dict_payload(self):
errors = validate_scan_config([1, 2, 3])
assert len(errors) == 1
assert errors[0]["path"] == "<root>"
def test_unknown_provider_section_tolerated(self):
# additionalProperties: True at the root level by design.
assert validate_scan_config({"newprovider": {"foo": "bar"}}) == []
def test_unknown_key_tolerated_by_pydantic_extra_allow(self):
# ProviderConfigBase has extra="allow" for forward compatibility.
assert validate_scan_config({"aws": {"completely_new_knob": 1}}) == []
def test_provider_section_must_be_mapping(self):
errors = validate_scan_config({"aws": "not a mapping"})
assert _has_error_for(errors, "aws")
def test_multiple_errors_surfaced(self):
errors = validate_scan_config(
{
"aws": {
"max_unused_access_keys_days": 5, # below min 30
"max_security_group_rules": 99999, # above max 1000
"ec2_high_risk_ports": [80, 70000], # port out of range
}
}
)
# All three should surface independently.
assert _has_error_for(errors, "aws.max_unused_access_keys_days")
assert _has_error_for(errors, "aws.max_security_group_rules")
assert _has_error_for(errors, "aws.ec2_high_risk_ports")
@@ -0,0 +1,123 @@
"""End-to-end tests that exercise the real ``load_and_validate_config_file``
through a temp YAML file. Anything that breaks here would break the actual
``prowler aws -c …`` code path."""
import logging
import os
import pathlib
import pytest
from prowler.config.config import load_and_validate_config_file
@pytest.fixture
def write_config(tmp_path):
def _write(content: str) -> str:
path = tmp_path / "config.yaml"
path.write_text(content)
return str(path)
return _write
class Test_Loader_With_Schema_Integration:
def test_shipped_default_config_loads_without_warnings(self, caplog):
"""The default ``prowler/config/config.yaml`` must round-trip every
provider WITHOUT emitting any schema warnings. If this fails,
someone added a key to the YAML without updating the schema."""
repo_root = pathlib.Path(os.path.dirname(os.path.realpath(__file__))).parents[2]
shipped = repo_root / "prowler" / "config" / "config.yaml"
with caplog.at_level(logging.WARNING, logger="prowler"):
for provider in [
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"mongodbatlas",
"cloudflare",
"vercel",
]:
cfg = load_and_validate_config_file(provider, str(shipped))
# Provider always exists in the shipped file → non-empty.
assert cfg, f"{provider} returned an empty config"
offending = [
r.getMessage()
for r in caplog.records
if "prowler.config[" in r.getMessage()
]
assert not offending, (
"Shipped config.yaml triggered schema warnings — schema or YAML out of sync:\n"
+ "\n".join(offending)
)
def test_user_config_with_bad_threshold_falls_back(self, write_config, caplog):
path = write_config(
"aws:\n"
" threat_detection_privilege_escalation_threshold: 5.0\n"
" lambda_min_azs: 2\n"
)
with caplog.at_level(logging.WARNING, logger="prowler"):
cfg = load_and_validate_config_file("aws", path)
assert cfg == {"lambda_min_azs": 2}
assert any(
"threat_detection_privilege_escalation_threshold" in r.getMessage()
for r in caplog.records
)
def test_old_format_config_still_works(self, write_config):
# Old format = flat keys, no provider header.
path = write_config(
"max_ec2_instance_age_in_days: 90\n"
"ecr_repository_vulnerability_minimum_severity: HIGH\n"
)
cfg = load_and_validate_config_file("aws", path)
assert cfg == {
"max_ec2_instance_age_in_days": 90,
"ecr_repository_vulnerability_minimum_severity": "HIGH",
}
def test_unknown_keys_pass_through_via_loader(self, write_config):
path = write_config(
"aws:\n" " third_party_plugin_setting: hello\n" " lambda_min_azs: 2\n"
)
cfg = load_and_validate_config_file("aws", path)
assert cfg == {
"third_party_plugin_setting": "hello",
"lambda_min_azs": 2,
}
def test_quoted_numeric_is_coerced_via_loader(self, write_config):
# YAML quotes the number: ``"180"`` arrives as a Python str.
# The schema must coerce it to int so downstream comparisons work.
path = write_config('aws:\n max_ec2_instance_age_in_days: "180"\n')
cfg = load_and_validate_config_file("aws", path)
assert cfg == {"max_ec2_instance_age_in_days": 180}
assert isinstance(cfg["max_ec2_instance_age_in_days"], int)
def test_invalid_yaml_shape_list_as_string_drops_key(self, write_config, caplog):
path = write_config(
"aws:\n"
" disallowed_regions: me-south-1\n" # forgot list dashes
" lambda_min_azs: 2\n"
)
with caplog.at_level(logging.WARNING, logger="prowler"):
cfg = load_and_validate_config_file("aws", path)
assert cfg == {"lambda_min_azs": 2}
assert any("disallowed_regions" in r.getMessage() for r in caplog.records)
def test_other_providers_unaffected_by_aws_block(self, write_config):
path = write_config(
"aws:\n max_ec2_instance_age_in_days: 90\n" "gcp:\n mig_min_zones: 5\n"
)
assert load_and_validate_config_file("aws", path) == {
"max_ec2_instance_age_in_days": 90
}
assert load_and_validate_config_file("gcp", path) == {"mig_min_zones": 5}
def test_missing_provider_block_returns_empty(self, write_config):
path = write_config("aws:\n max_ec2_instance_age_in_days: 90\n")
assert load_and_validate_config_file("azure", path) == {}
@@ -0,0 +1,152 @@
"""Smaller-provider schema coverage. One happy path + one invalid path
per field is enough to lock in the contract; the validator behaviour
itself is covered exhaustively in validator_test.py."""
import pytest
from prowler.config.schema.registry import SCHEMAS
from prowler.config.schema.validator import validate_provider_config
def _validate(provider, raw):
return validate_provider_config(provider, raw, SCHEMAS[provider])
class Test_Azure_Schema:
@pytest.mark.parametrize("level", ["Low", "Medium", "High", "Critical"])
def test_defender_risk_level_valid_values(self, level):
assert _validate(
"azure", {"defender_attack_path_minimal_risk_level": level}
) == {"defender_attack_path_minimal_risk_level": level}
def test_defender_risk_level_lowercase_dropped(self):
# Case matters: the matching check uses Title-case comparison.
assert (
_validate("azure", {"defender_attack_path_minimal_risk_level": "high"})
== {}
)
def test_apim_threshold_in_range(self):
out = _validate("azure", {"apim_threat_detection_llm_jacking_threshold": 0.1})
assert out == {"apim_threat_detection_llm_jacking_threshold": 0.1}
def test_apim_threshold_out_of_range(self):
out = _validate("azure", {"apim_threat_detection_llm_jacking_threshold": 1.5})
assert out == {}
def test_vm_backup_retention_must_be_positive(self):
assert _validate("azure", {"vm_backup_min_daily_retention_days": 7}) == {
"vm_backup_min_daily_retention_days": 7
}
assert _validate("azure", {"vm_backup_min_daily_retention_days": 0}) == {}
assert _validate("azure", {"vm_backup_min_daily_retention_days": -1}) == {}
class Test_GCP_Schema:
def test_valid_values_round_trip(self):
raw = {
"mig_min_zones": 2,
"max_snapshot_age_days": 90,
"max_unused_account_days": 180,
"storage_min_retention_days": 90,
}
assert _validate("gcp", raw) == raw
def test_zero_zone_count_dropped(self):
assert _validate("gcp", {"mig_min_zones": 0}) == {}
class Test_Kubernetes_Schema:
def test_valid_values_round_trip(self):
raw = {
"audit_log_maxbackup": 10,
"audit_log_maxsize": 100,
"audit_log_maxage": 30,
}
assert _validate("kubernetes", raw) == raw
def test_negative_audit_log_dropped(self):
assert _validate("kubernetes", {"audit_log_maxage": -1}) == {}
class Test_M365_Schema:
def test_valid_values_round_trip(self):
raw = {
"sign_in_frequency": 4,
"recommended_mailtips_large_audience_threshold": 25,
"audit_log_age": 90,
}
assert _validate("m365", raw) == raw
def test_negative_audit_log_age_dropped(self):
assert _validate("m365", {"audit_log_age": -10}) == {}
class Test_GitHub_Schema:
def test_valid_threshold(self):
assert _validate("github", {"inactive_not_archived_days_threshold": 180}) == {
"inactive_not_archived_days_threshold": 180
}
def test_zero_threshold_dropped(self):
assert _validate("github", {"inactive_not_archived_days_threshold": 0}) == {}
class Test_MongoDBAtlas_Schema:
def test_valid(self):
assert _validate(
"mongodbatlas", {"max_service_account_secret_validity_hours": 8}
) == {"max_service_account_secret_validity_hours": 8}
def test_invalid_negative(self):
assert (
_validate("mongodbatlas", {"max_service_account_secret_validity_hours": -1})
== {}
)
class Test_Cloudflare_Schema:
def test_zero_retries_allowed(self):
# 0 is explicitly documented as "disable retries" in config.yaml.
assert _validate("cloudflare", {"max_retries": 0}) == {"max_retries": 0}
def test_positive_retries_allowed(self):
assert _validate("cloudflare", {"max_retries": 3}) == {"max_retries": 3}
def test_negative_retries_dropped(self):
assert _validate("cloudflare", {"max_retries": -1}) == {}
class Test_Vercel_Schema:
def test_owner_percentage_in_range(self):
assert _validate("vercel", {"max_owner_percentage": 20}) == {
"max_owner_percentage": 20
}
assert _validate("vercel", {"max_owner_percentage": 1}) == {
"max_owner_percentage": 1
}
assert _validate("vercel", {"max_owner_percentage": 50}) == {
"max_owner_percentage": 50
}
def test_owner_percentage_over_max_dropped(self):
# Tightened to 1..50 — anything above (incl. previous 100) is dropped.
assert _validate("vercel", {"max_owner_percentage": 51}) == {}
assert _validate("vercel", {"max_owner_percentage": 150}) == {}
def test_owner_percentage_zero_or_negative_dropped(self):
# 0 is no longer a valid configuration (defeats PoLP signal).
assert _validate("vercel", {"max_owner_percentage": 0}) == {}
assert _validate("vercel", {"max_owner_percentage": -1}) == {}
def test_full_default_config_round_trip(self):
raw = {
"stable_branches": ["main", "master"],
"days_to_expire_threshold": 7,
"stale_token_threshold_days": 90,
"stale_invitation_threshold_days": 30,
"max_owner_percentage": 20,
"max_owners": 3,
"secret_suffixes": ["_KEY", "_SECRET", "_TOKEN"],
}
assert _validate("vercel", raw) == raw
+175
View File
@@ -0,0 +1,175 @@
"""Behavioural tests for ``validate_provider_config``.
The validator is the gatekeeper for every provider schema: its job is to
keep backwards-compatible behaviour (no exceptions, drop only the bad
keys) while loudly logging type mistakes.
"""
import logging
import pytest
from prowler.config.schema.aws import AWSProviderConfig
from prowler.config.schema.registry import SCHEMAS
from prowler.config.schema.validator import validate_provider_config
class Test_Validate_Provider_Config_Contract:
"""Generic invariants that must hold for any schema."""
def test_returns_empty_dict_when_raw_is_not_a_dict(self):
assert validate_provider_config("aws", None, AWSProviderConfig) == {}
assert validate_provider_config("aws", "string", AWSProviderConfig) == {}
assert validate_provider_config("aws", 42, AWSProviderConfig) == {}
assert validate_provider_config("aws", [], AWSProviderConfig) == {}
def test_returns_raw_unchanged_when_no_schema_registered(self):
raw = {"anything": "goes", "even": [1, 2, 3]}
assert validate_provider_config("mystery_provider", raw, None) == raw
def test_unknown_keys_pass_through_for_plugin_compatibility(self):
# Third-party plugins inject arbitrary keys; the schema must NOT
# filter them. This is the contract that lets the plugin ecosystem
# keep working when we add validation.
raw = {"plugin_custom_key": "foo", "lambda_min_azs": 2}
assert validate_provider_config("aws", raw, AWSProviderConfig) == {
"plugin_custom_key": "foo",
"lambda_min_azs": 2,
}
def test_empty_dict_returns_empty_dict(self):
assert validate_provider_config("aws", {}, AWSProviderConfig) == {}
def test_known_valid_value_passes_through_unchanged(self):
raw = {"max_ec2_instance_age_in_days": 180}
assert validate_provider_config("aws", raw, AWSProviderConfig) == {
"max_ec2_instance_age_in_days": 180
}
class Test_Validate_Provider_Config_Coercion:
"""Pydantic v2 coerces common type-mistakes automatically. We want to
keep that behaviour so quoted numerics in user configs ``Just Work``."""
def test_string_numeric_is_coerced_to_int(self):
out = validate_provider_config(
"aws", {"max_ec2_instance_age_in_days": "180"}, AWSProviderConfig
)
assert out == {"max_ec2_instance_age_in_days": 180}
assert isinstance(out["max_ec2_instance_age_in_days"], int)
def test_string_numeric_is_coerced_to_float(self):
out = validate_provider_config(
"aws",
{"threat_detection_privilege_escalation_threshold": "0.4"},
AWSProviderConfig,
)
assert out == {"threat_detection_privilege_escalation_threshold": 0.4}
class Test_Validate_Provider_Config_Drops_Invalid_Keys:
"""When a field fails validation, only that key is dropped from the
returned dict. The rest of the user's config is preserved so the
consumer's ``audit_config.get(key, default)`` falls back to its own
built-in default for the offending field and uses user values for
everything else."""
def test_out_of_range_threshold_is_dropped(self, caplog):
with caplog.at_level(logging.WARNING):
out = validate_provider_config(
"aws",
{
"threat_detection_privilege_escalation_threshold": 2.0,
"lambda_min_azs": 2,
},
AWSProviderConfig,
)
assert out == {"lambda_min_azs": 2}
assert any(
"threat_detection_privilege_escalation_threshold" in r.getMessage()
for r in caplog.records
)
def test_invalid_enum_is_dropped(self):
out = validate_provider_config(
"aws",
{"ecr_repository_vulnerability_minimum_severity": "medum"},
AWSProviderConfig,
)
assert out == {}
def test_wrong_shape_list_as_string_is_dropped(self):
# Classic YAML mistake: ``disallowed_regions: me-south-1`` without dashes.
# Pydantic refuses to silently treat a str as a single-element list,
# which is exactly the safety guarantee we want.
out = validate_provider_config(
"aws",
{"disallowed_regions": "me-south-1", "lambda_min_azs": 2},
AWSProviderConfig,
)
assert out == {"lambda_min_azs": 2}
def test_negative_positive_int_is_dropped(self):
out = validate_provider_config(
"aws", {"max_ec2_instance_age_in_days": -1}, AWSProviderConfig
)
assert out == {}
def test_zero_is_dropped_for_strictly_positive_field(self):
# max_ec2_instance_age_in_days is gt=0. Zero would silently cause every
# instance to FAIL the age check.
out = validate_provider_config(
"aws", {"max_ec2_instance_age_in_days": 0}, AWSProviderConfig
)
assert out == {}
def test_multiple_invalid_keys_yield_multiple_warnings(self, caplog):
with caplog.at_level(logging.WARNING):
out = validate_provider_config(
"aws",
{
"max_ec2_instance_age_in_days": "nope",
"ecr_repository_vulnerability_minimum_severity": "medum",
"valid_extra_key": "kept",
},
AWSProviderConfig,
)
assert out == {"valid_extra_key": "kept"}
messages = " ".join(r.getMessage() for r in caplog.records)
assert "max_ec2_instance_age_in_days" in messages
assert "ecr_repository_vulnerability_minimum_severity" in messages
def test_warning_message_includes_provider_and_field(self, caplog):
with caplog.at_level(logging.WARNING):
validate_provider_config(
"aws",
{"threat_detection_privilege_escalation_threshold": 5.0},
AWSProviderConfig,
)
assert any(
"prowler.config[aws.threat_detection_privilege_escalation_threshold]"
in r.getMessage()
for r in caplog.records
)
class Test_Schemas_Registry:
"""Every provider mentioned in the YAML config must have a schema."""
@pytest.mark.parametrize(
"provider",
[
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"mongodbatlas",
"cloudflare",
"vercel",
],
)
def test_schema_registered_for_provider(self, provider):
assert provider in SCHEMAS
assert SCHEMAS[provider] is not None