feat(config): add SDK config's validator (#11518)

Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
This commit is contained in:
Pedro Martín
2026-06-23 09:40:32 +02:00
committed by GitHub
parent 2afa18d3da
commit 0610866b73
25 changed files with 2342 additions and 1 deletions
View File
+222
View File
@@ -0,0 +1,222 @@
"""AWS-specific schema coverage — the biggest provider, with the richest
constraint surface (CIDRs, account IDs, port ranges, enums, thresholds)."""
import pytest
from prowler.config.schema.aws import AWSProviderConfig
from prowler.config.schema.validator import validate_provider_config
def _validate(raw):
return validate_provider_config("aws", raw, AWSProviderConfig)
class Test_AWS_Threat_Detection_Thresholds:
"""All threat detection thresholds are documented as fractions in 0..1.
The biggest risk of mistyping them is silently disabling the check."""
@pytest.mark.parametrize(
"key",
[
"threat_detection_privilege_escalation_threshold",
"threat_detection_enumeration_threshold",
"threat_detection_llm_jacking_threshold",
],
)
def test_valid_boundary_values(self, key):
assert _validate({key: 0.0}) == {key: 0.0}
assert _validate({key: 1.0}) == {key: 1.0}
assert _validate({key: 0.5}) == {key: 0.5}
@pytest.mark.parametrize(
"key",
[
"threat_detection_privilege_escalation_threshold",
"threat_detection_enumeration_threshold",
"threat_detection_llm_jacking_threshold",
],
)
def test_invalid_values_are_dropped(self, key):
# 20 instead of 0.2 — would never trigger
assert _validate({key: 20}) == {}
# negative
assert _validate({key: -0.1}) == {}
# string
assert _validate({key: "high"}) == {}
class Test_AWS_Trusted_Account_Ids:
def test_valid_twelve_digit_ids(self):
ids = ["123456789012", "098765432109"]
assert _validate({"trusted_account_ids": ids}) == {"trusted_account_ids": ids}
def test_empty_list_is_valid(self):
assert _validate({"trusted_account_ids": []}) == {"trusted_account_ids": []}
def test_short_id_is_dropped(self):
assert _validate({"trusted_account_ids": ["12345"]}) == {}
def test_non_numeric_id_is_dropped(self):
assert _validate({"trusted_account_ids": ["1234abcd5678"]}) == {}
def test_id_with_dashes_is_dropped(self):
# Some users format account IDs as "1234-5678-9012"
assert _validate({"trusted_account_ids": ["1234-5678-9012"]}) == {}
class Test_AWS_Trusted_Ips:
def test_single_ipv4_address(self):
assert _validate({"trusted_ips": ["1.2.3.4"]}) == {"trusted_ips": ["1.2.3.4"]}
def test_ipv4_cidr(self):
assert _validate({"trusted_ips": ["10.0.0.0/8"]}) == {
"trusted_ips": ["10.0.0.0/8"]
}
def test_ipv6_address(self):
assert _validate({"trusted_ips": ["2001:db8::1"]}) == {
"trusted_ips": ["2001:db8::1"]
}
def test_ipv6_cidr(self):
assert _validate({"trusted_ips": ["2001:db8::/32"]}) == {
"trusted_ips": ["2001:db8::/32"]
}
def test_mixed_list(self):
ips = ["1.2.3.4", "10.0.0.0/8", "2001:db8::1"]
assert _validate({"trusted_ips": ips}) == {"trusted_ips": ips}
def test_garbage_entry_is_dropped(self):
assert _validate({"trusted_ips": ["definitely-not-an-ip"]}) == {}
def test_cidr_with_host_bits_is_accepted(self):
# We use strict=False so "10.0.0.5/8" is accepted. This matches the
# behaviour of most security tools and avoids surprising users who
# paste real-world allowlists with non-canonical CIDR notation.
assert _validate({"trusted_ips": ["10.0.0.5/8"]}) == {
"trusted_ips": ["10.0.0.5/8"]
}
class Test_AWS_Ports:
def test_valid_ports_in_range(self):
ports = [25, 80, 443, 65535, 1]
assert _validate({"ec2_high_risk_ports": ports}) == {
"ec2_high_risk_ports": ports
}
def test_port_zero_is_dropped(self):
# Port 0 is reserved and not a valid security signal.
assert _validate({"ec2_high_risk_ports": [0]}) == {}
def test_out_of_range_port_is_dropped(self):
assert _validate({"ec2_high_risk_ports": [70000]}) == {}
def test_negative_port_is_dropped(self):
assert _validate({"ec2_high_risk_ports": [-1]}) == {}
class Test_AWS_Enums:
@pytest.mark.parametrize("level", ["CRITICAL", "HIGH", "MEDIUM", "LOW"])
def test_valid_severity_levels(self, level):
assert _validate({"ecr_repository_vulnerability_minimum_severity": level}) == {
"ecr_repository_vulnerability_minimum_severity": level
}
@pytest.mark.parametrize("level", ["critical", "Medium", "ANY", "", "X"])
def test_invalid_severity_levels_are_dropped(self, level):
assert _validate({"ecr_repository_vulnerability_minimum_severity": level}) == {}
class Test_AWS_Detect_Secrets_Plugins:
def test_plugin_without_limit(self):
out = _validate({"detect_secrets_plugins": [{"name": "AWSKeyDetector"}]})
assert out == {"detect_secrets_plugins": [{"name": "AWSKeyDetector"}]}
def test_plugin_with_limit(self):
out = _validate(
{
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": 6.0}
]
}
)
assert out == {
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": 6.0}
]
}
def test_plugin_missing_name_drops_whole_field(self):
# ``name`` is required by the upstream library.
out = _validate({"detect_secrets_plugins": [{"limit": 6.0}]})
assert out == {}
def test_extra_plugin_kwargs_pass_through(self):
# Plugins can have arbitrary extra params (extra="allow" on the
# nested model). They must round-trip.
out = _validate(
{
"detect_secrets_plugins": [
{"name": "Custom", "my_param": "abc", "other": 42}
]
}
)
assert out == {
"detect_secrets_plugins": [
{"name": "Custom", "my_param": "abc", "other": 42}
]
}
class Test_AWS_Booleans:
@pytest.mark.parametrize(
"key",
[
"mute_non_default_regions",
"verify_premium_support_plans",
"check_rds_instance_replicas",
],
)
def test_true_and_false_round_trip(self, key):
assert _validate({key: True}) == {key: True}
assert _validate({key: False}) == {key: False}
def test_yaml_style_boolean_coercion(self):
# YAML can produce Python str "true"/"yes" if the user quoted it.
# Pydantic v2 deterministically coerces "yes"/"no"/"true"/"false" to a
# real bool in lax mode, so the value is normalized rather than passed
# through as a string (which would be dangerous for
# verify_premium_support_plans).
out = _validate({"verify_premium_support_plans": "yes"})
assert "verify_premium_support_plans" in out
assert isinstance(out["verify_premium_support_plans"], bool)
assert out["verify_premium_support_plans"] is True
class Test_AWS_Full_Default_Config_Round_Trips:
"""Loading the real shipped defaults through the schema must produce
exactly the same dict. This is the regression sentinel for backwards
compatibility."""
def test_full_default_config_round_trip(self):
# Subset that mirrors the shipped config.yaml semantics.
raw = {
"mute_non_default_regions": False,
"disallowed_regions": ["me-south-1", "me-central-1"],
"max_unused_access_keys_days": 45,
"max_ec2_instance_age_in_days": 180,
"trusted_account_ids": [],
"trusted_ips": [],
"ecr_repository_vulnerability_minimum_severity": "MEDIUM",
"threat_detection_privilege_escalation_threshold": 0.2,
"threat_detection_enumeration_threshold": 0.3,
"threat_detection_llm_jacking_threshold": 0.4,
"ec2_high_risk_ports": [25, 110, 8088],
"detect_secrets_plugins": [
{"name": "AWSKeyDetector"},
{"name": "Base64HighEntropyString", "limit": 6.0},
],
}
assert _validate(raw) == raw
+398
View File
@@ -0,0 +1,398 @@
"""Boundary tests for the safety bounds added on top of the upstream schemas.
Each parametrised case checks (a) the min and max values are accepted and
(b) one step outside the range is rejected. Custom validators (semver,
EKS minor, dotted version, port range, account IDs, IPs) get focused
positive/negative tests.
Tests use the public adapter ``prowler.config.scan_config_schema``: a
schema violation surfaces as a list of ``{"path", "message"}`` entries.
This keeps the contract the Prowler App backend depends on under test.
"""
import pytest
from prowler.config.scan_config_schema import validate_scan_config
def _has_error_for(errors: list[dict], path_substr: str) -> bool:
return any(path_substr in e["path"] for e in errors)
# Each tuple: (provider, key, min_allowed, max_allowed)
INT_BOUND_CASES = [
# AWS
("aws", "max_unused_access_keys_days", 30, 180),
("aws", "max_console_access_days", 30, 180),
("aws", "max_unused_sagemaker_access_days", 7, 180),
("aws", "max_security_group_rules", 1, 1000),
("aws", "max_ec2_instance_age_in_days", 1, 1095),
("aws", "recommended_cdk_bootstrap_version", 1, 100),
("aws", "max_idle_disconnect_timeout_in_seconds", 60, 1800),
("aws", "max_disconnect_timeout_in_seconds", 60, 3600),
("aws", "max_session_duration_seconds", 600, 86400),
("aws", "lambda_min_azs", 1, 6),
("aws", "threat_detection_privilege_escalation_minutes", 5, 43200),
("aws", "threat_detection_enumeration_minutes", 5, 43200),
("aws", "threat_detection_llm_jacking_minutes", 5, 43200),
("aws", "days_to_expire_threshold", 7, 365),
("aws", "elb_min_azs", 1, 6),
("aws", "elbv2_min_azs", 1, 6),
("aws", "minimum_snapshot_retention_period", 1, 35),
("aws", "max_days_secret_unused", 7, 365),
("aws", "max_days_secret_unrotated", 1, 180),
("aws", "min_kinesis_stream_retention_hours", 24, 8760),
# Azure
("azure", "vm_backup_min_daily_retention_days", 7, 9999),
("azure", "apim_threat_detection_llm_jacking_minutes", 5, 43200),
# GCP
("gcp", "mig_min_zones", 1, 5),
("gcp", "max_snapshot_age_days", 1, 1095),
("gcp", "max_unused_account_days", 30, 365),
("gcp", "storage_min_retention_days", 1, 3650),
# Kubernetes
("kubernetes", "audit_log_maxbackup", 2, 1000),
("kubernetes", "audit_log_maxsize", 10, 10000),
("kubernetes", "audit_log_maxage", 7, 3650),
# M365
("m365", "sign_in_frequency", 1, 168),
("m365", "recommended_mailtips_large_audience_threshold", 5, 10000),
("m365", "audit_log_age", 30, 3650),
# GitHub
("github", "inactive_not_archived_days_threshold", 30, 3650),
# MongoDB Atlas
("mongodbatlas", "max_service_account_secret_validity_hours", 1, 720),
# Cloudflare
("cloudflare", "max_retries", 0, 10),
# Vercel
("vercel", "days_to_expire_threshold", 7, 365),
("vercel", "stale_token_threshold_days", 30, 3650),
("vercel", "stale_invitation_threshold_days", 7, 365),
("vercel", "max_owner_percentage", 1, 50),
("vercel", "max_owners", 1, 1000),
]
FLOAT_THRESHOLD_FIELDS = [
("aws", "threat_detection_privilege_escalation_threshold"),
("aws", "threat_detection_enumeration_threshold"),
("aws", "threat_detection_llm_jacking_threshold"),
("azure", "apim_threat_detection_llm_jacking_threshold"),
]
class TestIntegerBounds:
"""Each int field accepts both ends of its range and rejects ±1 outside."""
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_min_accepted(self, provider, key, lo, hi):
assert validate_scan_config({provider: {key: lo}}) == []
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_max_accepted(self, provider, key, lo, hi):
assert validate_scan_config({provider: {key: hi}}) == []
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_below_min_rejected(self, provider, key, lo, hi):
errors = validate_scan_config({provider: {key: lo - 1}})
assert _has_error_for(errors, f"{provider}.{key}"), errors
@pytest.mark.parametrize("provider, key, lo, hi", INT_BOUND_CASES)
def test_above_max_rejected(self, provider, key, lo, hi):
errors = validate_scan_config({provider: {key: hi + 1}})
assert _has_error_for(errors, f"{provider}.{key}"), errors
class TestFloatThresholds:
"""Threshold floats must stay within 0..1 inclusive."""
@pytest.mark.parametrize("provider, key", FLOAT_THRESHOLD_FIELDS)
def test_zero_and_one_accepted(self, provider, key):
assert validate_scan_config({provider: {key: 0.0}}) == []
assert validate_scan_config({provider: {key: 1.0}}) == []
assert validate_scan_config({provider: {key: 0.5}}) == []
@pytest.mark.parametrize("provider, key", FLOAT_THRESHOLD_FIELDS)
def test_negative_rejected(self, provider, key):
errors = validate_scan_config({provider: {key: -0.01}})
assert _has_error_for(errors, f"{provider}.{key}")
@pytest.mark.parametrize("provider, key", FLOAT_THRESHOLD_FIELDS)
def test_above_one_rejected(self, provider, key):
errors = validate_scan_config({provider: {key: 1.01}})
assert _has_error_for(errors, f"{provider}.{key}")
class TestCloudWatchRetention:
"""`log_group_retention_days` only accepts the AWS-approved enum values."""
@pytest.mark.parametrize("value", [1, 7, 30, 365, 731, 3653])
def test_valid_values_accepted(self, value):
assert validate_scan_config({"aws": {"log_group_retention_days": value}}) == []
@pytest.mark.parametrize("value", [0, 2, 42, 500, 999, 4000])
def test_invalid_values_rejected(self, value):
errors = validate_scan_config({"aws": {"log_group_retention_days": value}})
assert _has_error_for(errors, "aws.log_group_retention_days")
class TestSemverValidator:
"""AWS Fargate platform versions: X.Y.Z."""
@pytest.mark.parametrize("value", ["1.4.0", "1.0.0", "0.0.1", "10.20.30"])
def test_accepts_semver(self, value):
assert (
validate_scan_config({"aws": {"fargate_linux_latest_version": value}}) == []
)
@pytest.mark.parametrize("value", ["1.4", "1", "v1.4.0", "1.4.0-beta", "a.b.c", ""])
def test_rejects_non_semver(self, value):
errors = validate_scan_config({"aws": {"fargate_linux_latest_version": value}})
assert _has_error_for(errors, "aws.fargate_linux_latest_version")
class TestEksVersionValidator:
"""`eks_cluster_oldest_version_supported` expects MAJOR.MINOR."""
@pytest.mark.parametrize("value", ["1.28", "1.29", "1.30", "2.0"])
def test_accepts_minor(self, value):
assert (
validate_scan_config(
{"aws": {"eks_cluster_oldest_version_supported": value}}
)
== []
)
@pytest.mark.parametrize("value", ["1.28.0", "v1.28", "1", "1.x", ""])
def test_rejects_invalid(self, value):
errors = validate_scan_config(
{"aws": {"eks_cluster_oldest_version_supported": value}}
)
assert _has_error_for(errors, "aws.eks_cluster_oldest_version_supported")
class TestEksLogTypesEnum:
"""Only the documented log types are accepted."""
def test_full_enum_accepted(self):
assert (
validate_scan_config(
{
"aws": {
"eks_required_log_types": [
"api",
"audit",
"authenticator",
"controllerManager",
"scheduler",
]
}
}
)
== []
)
def test_unknown_type_rejected(self):
errors = validate_scan_config(
{"aws": {"eks_required_log_types": ["api", "telemetry"]}}
)
assert _has_error_for(errors, "aws.eks_required_log_types")
class TestAzureDottedVersion:
"""App Service versions accept 'X' and 'X.Y' but not 'X.Y.Z' or junk."""
@pytest.mark.parametrize("value", ["8.2", "3.12", "17"])
def test_accepts(self, value):
assert validate_scan_config({"azure": {"php_latest_version": value}}) == []
assert validate_scan_config({"azure": {"python_latest_version": value}}) == []
assert validate_scan_config({"azure": {"java_latest_version": value}}) == []
@pytest.mark.parametrize("value", ["8.2.0", "v8", "8.x", ""])
def test_rejects(self, value):
errors = validate_scan_config({"azure": {"php_latest_version": value}})
assert _has_error_for(errors, "azure.php_latest_version")
class TestAzureTlsLiteralEnum:
"""Only TLS 1.2 and 1.3 are tolerated by the recommended list."""
def test_accepted_versions(self):
assert (
validate_scan_config(
{"azure": {"recommended_minimal_tls_versions": ["1.2", "1.3"]}}
)
== []
)
@pytest.mark.parametrize("value", ["1.0", "1.1", "2.0", ""])
def test_unknown_version_rejected(self, value):
errors = validate_scan_config(
{"azure": {"recommended_minimal_tls_versions": [value]}}
)
assert _has_error_for(errors, "azure.recommended_minimal_tls_versions")
class TestAzureRiskLevelLiteral:
"""Defender attack-path risk level is a closed enum."""
@pytest.mark.parametrize("value", ["Low", "Medium", "High", "Critical"])
def test_accepted(self, value):
assert (
validate_scan_config(
{"azure": {"defender_attack_path_minimal_risk_level": value}}
)
== []
)
@pytest.mark.parametrize("value", ["low", "CRITICAL", "Severe", ""])
def test_rejected(self, value):
errors = validate_scan_config(
{"azure": {"defender_attack_path_minimal_risk_level": value}}
)
assert _has_error_for(errors, "azure.defender_attack_path_minimal_risk_level")
class TestECRSeverityLiteral:
"""ECR severity is a closed enum (with INFORMATIONAL allowed)."""
@pytest.mark.parametrize(
"value",
["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFORMATIONAL"],
)
def test_accepted(self, value):
assert (
validate_scan_config(
{"aws": {"ecr_repository_vulnerability_minimum_severity": value}}
)
== []
)
@pytest.mark.parametrize("value", ["URGENT", "low", "Crit", ""])
def test_rejected(self, value):
errors = validate_scan_config(
{"aws": {"ecr_repository_vulnerability_minimum_severity": value}}
)
assert _has_error_for(
errors, "aws.ecr_repository_vulnerability_minimum_severity"
)
class TestPortRangeValidator:
"""Each entry of `ec2_high_risk_ports` must be 1..65535 (0 is reserved)."""
def test_valid_ports(self):
assert (
validate_scan_config({"aws": {"ec2_high_risk_ports": [1, 22, 8080, 65535]}})
== []
)
@pytest.mark.parametrize("value", [-1, 0, 65536, 99999])
def test_invalid_port_rejected(self, value):
errors = validate_scan_config({"aws": {"ec2_high_risk_ports": [80, value]}})
assert _has_error_for(errors, "aws.ec2_high_risk_ports")
class TestAccountIdsValidator:
"""AWS account IDs are 12-digit strings."""
def test_valid(self):
assert (
validate_scan_config(
{"aws": {"trusted_account_ids": ["123456789012", "098765432109"]}}
)
== []
)
@pytest.mark.parametrize(
"value", ["12345", "12345678901", "1234567890123", "12345678901a"]
)
def test_invalid_rejected(self, value):
errors = validate_scan_config({"aws": {"trusted_account_ids": [value]}})
assert _has_error_for(errors, "aws.trusted_account_ids")
class TestTrustedIpsValidator:
"""Trusted IPs accept IPv4, IPv6, and CIDR; reject junk."""
@pytest.mark.parametrize(
"value",
["1.2.3.4", "10.0.0.0/8", "2001:db8::1", "2001:db8::/32"],
)
def test_valid(self, value):
assert validate_scan_config({"aws": {"trusted_ips": [value]}}) == []
@pytest.mark.parametrize(
"value", ["not.an.ip", "1.2.3.300", "10.0.0.0/40", "::ffff:::"]
)
def test_invalid_rejected(self, value):
errors = validate_scan_config({"aws": {"trusted_ips": [value]}})
assert _has_error_for(errors, "aws.trusted_ips")
class TestDetectSecretsEntropyBound:
"""`detect_secrets_plugins[].limit` is Shannon entropy: 0..10."""
@pytest.mark.parametrize("value", [0.0, 3.5, 4.5, 8.0, 10.0])
def test_valid(self, value):
assert (
validate_scan_config(
{
"aws": {
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": value}
]
}
}
)
== []
)
@pytest.mark.parametrize("value", [-0.1, 10.01, 50])
def test_invalid(self, value):
errors = validate_scan_config(
{
"aws": {
"detect_secrets_plugins": [
{"name": "Base64HighEntropyString", "limit": value}
]
}
}
)
assert _has_error_for(errors, "aws.detect_secrets_plugins")
class TestAdapterRobustness:
"""Top-level adapter behaviour the Prowler App backend depends on."""
def test_non_dict_payload(self):
errors = validate_scan_config([1, 2, 3])
assert len(errors) == 1
assert errors[0]["path"] == "<root>"
def test_unknown_provider_section_tolerated(self):
# additionalProperties: True at the root level by design.
assert validate_scan_config({"newprovider": {"foo": "bar"}}) == []
def test_unknown_key_tolerated_by_pydantic_extra_allow(self):
# ProviderConfigBase has extra="allow" for forward compatibility.
assert validate_scan_config({"aws": {"completely_new_knob": 1}}) == []
def test_provider_section_must_be_mapping(self):
errors = validate_scan_config({"aws": "not a mapping"})
assert _has_error_for(errors, "aws")
def test_multiple_errors_surfaced(self):
errors = validate_scan_config(
{
"aws": {
"max_unused_access_keys_days": 5, # below min 30
"max_security_group_rules": 99999, # above max 1000
"ec2_high_risk_ports": [80, 70000], # port out of range
}
}
)
# All three should surface independently.
assert _has_error_for(errors, "aws.max_unused_access_keys_days")
assert _has_error_for(errors, "aws.max_security_group_rules")
assert _has_error_for(errors, "aws.ec2_high_risk_ports")
@@ -0,0 +1,124 @@
"""End-to-end tests that exercise the real ``load_and_validate_config_file``
through a temp YAML file. Anything that breaks here would break the actual
``prowler aws -c …`` code path."""
import logging
import os
import pathlib
from typing import Callable
import pytest
from prowler.config.config import load_and_validate_config_file
@pytest.fixture
def write_config(tmp_path: pathlib.Path) -> Callable[[str], str]:
def _write(content: str) -> str:
path = tmp_path / "config.yaml"
path.write_text(content)
return str(path)
return _write
class Test_Loader_With_Schema_Integration:
def test_shipped_default_config_loads_without_warnings(self, caplog):
"""The default ``prowler/config/config.yaml`` must round-trip every
provider WITHOUT emitting any schema warnings. If this fails,
someone added a key to the YAML without updating the schema."""
repo_root = pathlib.Path(os.path.dirname(os.path.realpath(__file__))).parents[2]
shipped = repo_root / "prowler" / "config" / "config.yaml"
with caplog.at_level(logging.WARNING, logger="prowler"):
for provider in [
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"mongodbatlas",
"cloudflare",
"vercel",
]:
cfg = load_and_validate_config_file(provider, str(shipped))
# Provider always exists in the shipped file → non-empty.
assert cfg, f"{provider} returned an empty config"
offending = [
r.getMessage()
for r in caplog.records
if "prowler.config[" in r.getMessage()
]
assert not offending, (
"Shipped config.yaml triggered schema warnings — schema or YAML out of sync:\n"
+ "\n".join(offending)
)
def test_user_config_with_bad_threshold_falls_back(self, write_config, caplog):
path = write_config(
"aws:\n"
" threat_detection_privilege_escalation_threshold: 5.0\n"
" lambda_min_azs: 2\n"
)
with caplog.at_level(logging.WARNING, logger="prowler"):
cfg = load_and_validate_config_file("aws", path)
assert cfg == {"lambda_min_azs": 2}
assert any(
"threat_detection_privilege_escalation_threshold" in r.getMessage()
for r in caplog.records
)
def test_old_format_config_still_works(self, write_config):
# Old format = flat keys, no provider header.
path = write_config(
"max_ec2_instance_age_in_days: 90\n"
"ecr_repository_vulnerability_minimum_severity: HIGH\n"
)
cfg = load_and_validate_config_file("aws", path)
assert cfg == {
"max_ec2_instance_age_in_days": 90,
"ecr_repository_vulnerability_minimum_severity": "HIGH",
}
def test_unknown_keys_pass_through_via_loader(self, write_config):
path = write_config(
"aws:\n" " third_party_plugin_setting: hello\n" " lambda_min_azs: 2\n"
)
cfg = load_and_validate_config_file("aws", path)
assert cfg == {
"third_party_plugin_setting": "hello",
"lambda_min_azs": 2,
}
def test_quoted_numeric_is_coerced_via_loader(self, write_config):
# YAML quotes the number: ``"180"`` arrives as a Python str.
# The schema must coerce it to int so downstream comparisons work.
path = write_config('aws:\n max_ec2_instance_age_in_days: "180"\n')
cfg = load_and_validate_config_file("aws", path)
assert cfg == {"max_ec2_instance_age_in_days": 180}
assert isinstance(cfg["max_ec2_instance_age_in_days"], int)
def test_invalid_yaml_shape_list_as_string_drops_key(self, write_config, caplog):
path = write_config(
"aws:\n"
" disallowed_regions: me-south-1\n" # forgot list dashes
" lambda_min_azs: 2\n"
)
with caplog.at_level(logging.WARNING, logger="prowler"):
cfg = load_and_validate_config_file("aws", path)
assert cfg == {"lambda_min_azs": 2}
assert any("disallowed_regions" in r.getMessage() for r in caplog.records)
def test_other_providers_unaffected_by_aws_block(self, write_config):
path = write_config(
"aws:\n max_ec2_instance_age_in_days: 90\n" "gcp:\n mig_min_zones: 5\n"
)
assert load_and_validate_config_file("aws", path) == {
"max_ec2_instance_age_in_days": 90
}
assert load_and_validate_config_file("gcp", path) == {"mig_min_zones": 5}
def test_missing_provider_block_returns_empty(self, write_config):
path = write_config("aws:\n max_ec2_instance_age_in_days: 90\n")
assert load_and_validate_config_file("azure", path) == {}
@@ -0,0 +1,152 @@
"""Smaller-provider schema coverage. One happy path + one invalid path
per field is enough to lock in the contract; the validator behaviour
itself is covered exhaustively in validator_test.py."""
import pytest
from prowler.config.schema.registry import SCHEMAS
from prowler.config.schema.validator import validate_provider_config
def _validate(provider, raw):
return validate_provider_config(provider, raw, SCHEMAS[provider])
class Test_Azure_Schema:
@pytest.mark.parametrize("level", ["Low", "Medium", "High", "Critical"])
def test_defender_risk_level_valid_values(self, level):
assert _validate(
"azure", {"defender_attack_path_minimal_risk_level": level}
) == {"defender_attack_path_minimal_risk_level": level}
def test_defender_risk_level_lowercase_dropped(self):
# Case matters: the matching check uses Title-case comparison.
assert (
_validate("azure", {"defender_attack_path_minimal_risk_level": "high"})
== {}
)
def test_apim_threshold_in_range(self):
out = _validate("azure", {"apim_threat_detection_llm_jacking_threshold": 0.1})
assert out == {"apim_threat_detection_llm_jacking_threshold": 0.1}
def test_apim_threshold_out_of_range(self):
out = _validate("azure", {"apim_threat_detection_llm_jacking_threshold": 1.5})
assert out == {}
def test_vm_backup_retention_must_be_positive(self):
assert _validate("azure", {"vm_backup_min_daily_retention_days": 7}) == {
"vm_backup_min_daily_retention_days": 7
}
assert _validate("azure", {"vm_backup_min_daily_retention_days": 0}) == {}
assert _validate("azure", {"vm_backup_min_daily_retention_days": -1}) == {}
class Test_GCP_Schema:
def test_valid_values_round_trip(self):
raw = {
"mig_min_zones": 2,
"max_snapshot_age_days": 90,
"max_unused_account_days": 180,
"storage_min_retention_days": 90,
}
assert _validate("gcp", raw) == raw
def test_zero_zone_count_dropped(self):
assert _validate("gcp", {"mig_min_zones": 0}) == {}
class Test_Kubernetes_Schema:
def test_valid_values_round_trip(self):
raw = {
"audit_log_maxbackup": 10,
"audit_log_maxsize": 100,
"audit_log_maxage": 30,
}
assert _validate("kubernetes", raw) == raw
def test_negative_audit_log_dropped(self):
assert _validate("kubernetes", {"audit_log_maxage": -1}) == {}
class Test_M365_Schema:
def test_valid_values_round_trip(self):
raw = {
"sign_in_frequency": 4,
"recommended_mailtips_large_audience_threshold": 25,
"audit_log_age": 90,
}
assert _validate("m365", raw) == raw
def test_negative_audit_log_age_dropped(self):
assert _validate("m365", {"audit_log_age": -10}) == {}
class Test_GitHub_Schema:
def test_valid_threshold(self):
assert _validate("github", {"inactive_not_archived_days_threshold": 180}) == {
"inactive_not_archived_days_threshold": 180
}
def test_zero_threshold_dropped(self):
assert _validate("github", {"inactive_not_archived_days_threshold": 0}) == {}
class Test_MongoDBAtlas_Schema:
def test_valid(self):
assert _validate(
"mongodbatlas", {"max_service_account_secret_validity_hours": 8}
) == {"max_service_account_secret_validity_hours": 8}
def test_invalid_negative(self):
assert (
_validate("mongodbatlas", {"max_service_account_secret_validity_hours": -1})
== {}
)
class Test_Cloudflare_Schema:
def test_zero_retries_allowed(self):
# 0 is explicitly documented as "disable retries" in config.yaml.
assert _validate("cloudflare", {"max_retries": 0}) == {"max_retries": 0}
def test_positive_retries_allowed(self):
assert _validate("cloudflare", {"max_retries": 3}) == {"max_retries": 3}
def test_negative_retries_dropped(self):
assert _validate("cloudflare", {"max_retries": -1}) == {}
class Test_Vercel_Schema:
def test_owner_percentage_in_range(self):
assert _validate("vercel", {"max_owner_percentage": 20}) == {
"max_owner_percentage": 20
}
assert _validate("vercel", {"max_owner_percentage": 1}) == {
"max_owner_percentage": 1
}
assert _validate("vercel", {"max_owner_percentage": 50}) == {
"max_owner_percentage": 50
}
def test_owner_percentage_over_max_dropped(self):
# Tightened to 1..50 — anything above (incl. previous 100) is dropped.
assert _validate("vercel", {"max_owner_percentage": 51}) == {}
assert _validate("vercel", {"max_owner_percentage": 150}) == {}
def test_owner_percentage_zero_or_negative_dropped(self):
# 0 is no longer a valid configuration (defeats PoLP signal).
assert _validate("vercel", {"max_owner_percentage": 0}) == {}
assert _validate("vercel", {"max_owner_percentage": -1}) == {}
def test_full_default_config_round_trip(self):
raw = {
"stable_branches": ["main", "master"],
"days_to_expire_threshold": 7,
"stale_token_threshold_days": 90,
"stale_invitation_threshold_days": 30,
"max_owner_percentage": 20,
"max_owners": 3,
"secret_suffixes": ["_KEY", "_SECRET", "_TOKEN"],
}
assert _validate("vercel", raw) == raw
+175
View File
@@ -0,0 +1,175 @@
"""Behavioural tests for ``validate_provider_config``.
The validator is the gatekeeper for every provider schema: its job is to
keep backwards-compatible behaviour (no exceptions, drop only the bad
keys) while loudly logging type mistakes.
"""
import logging
import pytest
from prowler.config.schema.aws import AWSProviderConfig
from prowler.config.schema.registry import SCHEMAS
from prowler.config.schema.validator import validate_provider_config
class Test_Validate_Provider_Config_Contract:
"""Generic invariants that must hold for any schema."""
def test_returns_empty_dict_when_raw_is_not_a_dict(self):
assert validate_provider_config("aws", None, AWSProviderConfig) == {}
assert validate_provider_config("aws", "string", AWSProviderConfig) == {}
assert validate_provider_config("aws", 42, AWSProviderConfig) == {}
assert validate_provider_config("aws", [], AWSProviderConfig) == {}
def test_returns_raw_unchanged_when_no_schema_registered(self):
raw = {"anything": "goes", "even": [1, 2, 3]}
assert validate_provider_config("mystery_provider", raw, None) == raw
def test_unknown_keys_pass_through_for_plugin_compatibility(self):
# Third-party plugins inject arbitrary keys; the schema must NOT
# filter them. This is the contract that lets the plugin ecosystem
# keep working when we add validation.
raw = {"plugin_custom_key": "foo", "lambda_min_azs": 2}
assert validate_provider_config("aws", raw, AWSProviderConfig) == {
"plugin_custom_key": "foo",
"lambda_min_azs": 2,
}
def test_empty_dict_returns_empty_dict(self):
assert validate_provider_config("aws", {}, AWSProviderConfig) == {}
def test_known_valid_value_passes_through_unchanged(self):
raw = {"max_ec2_instance_age_in_days": 180}
assert validate_provider_config("aws", raw, AWSProviderConfig) == {
"max_ec2_instance_age_in_days": 180
}
class Test_Validate_Provider_Config_Coercion:
"""Pydantic v2 coerces common type-mistakes automatically. We want to
keep that behaviour so quoted numerics in user configs ``Just Work``."""
def test_string_numeric_is_coerced_to_int(self):
out = validate_provider_config(
"aws", {"max_ec2_instance_age_in_days": "180"}, AWSProviderConfig
)
assert out == {"max_ec2_instance_age_in_days": 180}
assert isinstance(out["max_ec2_instance_age_in_days"], int)
def test_string_numeric_is_coerced_to_float(self):
out = validate_provider_config(
"aws",
{"threat_detection_privilege_escalation_threshold": "0.4"},
AWSProviderConfig,
)
assert out == {"threat_detection_privilege_escalation_threshold": 0.4}
class Test_Validate_Provider_Config_Drops_Invalid_Keys:
"""When a field fails validation, only that key is dropped from the
returned dict. The rest of the user's config is preserved so the
consumer's ``audit_config.get(key, default)`` falls back to its own
built-in default for the offending field and uses user values for
everything else."""
def test_out_of_range_threshold_is_dropped(self, caplog):
with caplog.at_level(logging.WARNING):
out = validate_provider_config(
"aws",
{
"threat_detection_privilege_escalation_threshold": 2.0,
"lambda_min_azs": 2,
},
AWSProviderConfig,
)
assert out == {"lambda_min_azs": 2}
assert any(
"threat_detection_privilege_escalation_threshold" in r.getMessage()
for r in caplog.records
)
def test_invalid_enum_is_dropped(self):
out = validate_provider_config(
"aws",
{"ecr_repository_vulnerability_minimum_severity": "medum"},
AWSProviderConfig,
)
assert out == {}
def test_wrong_shape_list_as_string_is_dropped(self):
# Classic YAML mistake: ``disallowed_regions: me-south-1`` without dashes.
# Pydantic refuses to silently treat a str as a single-element list,
# which is exactly the safety guarantee we want.
out = validate_provider_config(
"aws",
{"disallowed_regions": "me-south-1", "lambda_min_azs": 2},
AWSProviderConfig,
)
assert out == {"lambda_min_azs": 2}
def test_negative_positive_int_is_dropped(self):
out = validate_provider_config(
"aws", {"max_ec2_instance_age_in_days": -1}, AWSProviderConfig
)
assert out == {}
def test_zero_is_dropped_for_strictly_positive_field(self):
# max_ec2_instance_age_in_days is gt=0. Zero would silently cause every
# instance to FAIL the age check.
out = validate_provider_config(
"aws", {"max_ec2_instance_age_in_days": 0}, AWSProviderConfig
)
assert out == {}
def test_multiple_invalid_keys_yield_multiple_warnings(self, caplog):
with caplog.at_level(logging.WARNING):
out = validate_provider_config(
"aws",
{
"max_ec2_instance_age_in_days": "nope",
"ecr_repository_vulnerability_minimum_severity": "medum",
"valid_extra_key": "kept",
},
AWSProviderConfig,
)
assert out == {"valid_extra_key": "kept"}
messages = " ".join(r.getMessage() for r in caplog.records)
assert "max_ec2_instance_age_in_days" in messages
assert "ecr_repository_vulnerability_minimum_severity" in messages
def test_warning_message_includes_provider_and_field(self, caplog):
with caplog.at_level(logging.WARNING):
validate_provider_config(
"aws",
{"threat_detection_privilege_escalation_threshold": 5.0},
AWSProviderConfig,
)
assert any(
"prowler.config[aws.threat_detection_privilege_escalation_threshold]"
in r.getMessage()
for r in caplog.records
)
class Test_Schemas_Registry:
"""Every provider mentioned in the YAML config must have a schema."""
@pytest.mark.parametrize(
"provider",
[
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"mongodbatlas",
"cloudflare",
"vercel",
],
)
def test_schema_registered_for_provider(self, provider):
assert provider in SCHEMAS
assert SCHEMAS[provider] is not None