mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-02-09 02:30:43 +00:00
feat: add new validators
This commit is contained in:
@@ -31,31 +31,12 @@ aws:
|
||||
max_ec2_instance_age_in_days: 180
|
||||
# aws.ec2_securitygroup_allow_ingress_from_internet_to_any_port
|
||||
# allowed network interface types for security groups open to the Internet
|
||||
ec2_allowed_interface_types:
|
||||
[
|
||||
"api_gateway_managed",
|
||||
"vpc_endpoint",
|
||||
]
|
||||
ec2_allowed_interface_types: ["api_gateway_managed", "vpc_endpoint"]
|
||||
# allowed network interface owners for security groups open to the Internet
|
||||
ec2_allowed_instance_owners:
|
||||
[
|
||||
"amazon-elb"
|
||||
]
|
||||
ec2_allowed_instance_owners: ["amazon-elb"]
|
||||
# aws.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports
|
||||
ec2_high_risk_ports:
|
||||
[
|
||||
25,
|
||||
110,
|
||||
135,
|
||||
143,
|
||||
445,
|
||||
3000,
|
||||
4333,
|
||||
5000,
|
||||
5500,
|
||||
8080,
|
||||
8088,
|
||||
]
|
||||
[25, 110, 135, 143, 445, 3000, 4333, 5000, 5500, 8080, 8088]
|
||||
|
||||
# AWS ECS Configuration
|
||||
# aws.ecs_service_fargate_latest_platform_version
|
||||
@@ -423,6 +404,81 @@ aws:
|
||||
[
|
||||
]
|
||||
|
||||
# AWS Check Validation Configuration
|
||||
# Valid CheckType values for AWS provider based on AWS Security Hub ASFF format
|
||||
# Reference: https://docs.aws.amazon.com/securityhub/latest/userguide/asff-required-attributes.html#Types
|
||||
# Supports partial paths: namespace, namespace/category, or namespace/category/classifier
|
||||
valid_check_types:
|
||||
"Software and Configuration Checks":
|
||||
"Vulnerabilities":
|
||||
"CVE": ""
|
||||
"AWS Security Best Practices":
|
||||
"Network Reachability": ""
|
||||
"Runtime Behavior Analysis": ""
|
||||
"Industry and Regulatory Standards":
|
||||
"AWS Foundational Security Best Practices": ""
|
||||
"CIS Host Hardening Benchmarks": ""
|
||||
"CIS AWS Foundations Benchmark": ""
|
||||
"PCI-DSS": ""
|
||||
"Cloud Security Alliance Controls": ""
|
||||
"ISO 90001 Controls": ""
|
||||
"ISO 27001 Controls": ""
|
||||
"ISO 27017 Controls": ""
|
||||
"ISO 27018 Controls": ""
|
||||
"SOC 1": ""
|
||||
"SOC 2": ""
|
||||
"HIPAA Controls (USA)": ""
|
||||
"NIST 800-53 Controls (USA)": ""
|
||||
"NIST CSF Controls (USA)": ""
|
||||
"IRAP Controls (Australia)": ""
|
||||
"K-ISMS Controls (Korea)": ""
|
||||
"MTCS Controls (Singapore)": ""
|
||||
"FISC Controls (Japan)": ""
|
||||
"My Number Act Controls (Japan)": ""
|
||||
"ENS Controls (Spain)": ""
|
||||
"Cyber Essentials Plus Controls (UK)": ""
|
||||
"G-Cloud Controls (UK)": ""
|
||||
"C5 Controls (Germany)": ""
|
||||
"IT-Grundschutz Controls (Germany)": ""
|
||||
"GDPR Controls (Europe)": ""
|
||||
"TISAX Controls (Europe)": ""
|
||||
"Patch Management": ""
|
||||
"TTPs":
|
||||
"Initial Access": ""
|
||||
"Execution": ""
|
||||
"Persistence": ""
|
||||
"Privilege Escalation": ""
|
||||
"Defense Evasion": ""
|
||||
"Credential Access": ""
|
||||
"Discovery": ""
|
||||
"Lateral Movement": ""
|
||||
"Collection": ""
|
||||
"Command and Control": ""
|
||||
"Effects":
|
||||
"Data Exposure": ""
|
||||
"Data Exfiltration": ""
|
||||
"Data Destruction": ""
|
||||
"Denial of Service": ""
|
||||
"Resource Consumption": ""
|
||||
"Unusual Behaviors":
|
||||
"Application": ""
|
||||
"Network Flow": ""
|
||||
"IP address": ""
|
||||
"User": ""
|
||||
"VM": ""
|
||||
"Container": ""
|
||||
"Serverless": ""
|
||||
"Process": ""
|
||||
"Database": ""
|
||||
"Data": ""
|
||||
"Sensitive Data Identifications":
|
||||
"PII": ""
|
||||
"Passwords": ""
|
||||
"Legal": ""
|
||||
"Financial": ""
|
||||
"Security": ""
|
||||
"Business": ""
|
||||
|
||||
# Azure Configuration
|
||||
azure:
|
||||
# Azure Network Configuration
|
||||
|
||||
@@ -15,6 +15,48 @@ from prowler.lib.check.utils import recover_checks_from_provider
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
def _validate_aws_check_type_in_config(check_type: str) -> bool:
|
||||
"""
|
||||
Validate if a CheckType exists in the AWS config using direct lookups.
|
||||
Supports partial paths: namespace, namespace/category, namespace/category/classifier
|
||||
|
||||
Args:
|
||||
check_type: The CheckType string to validate (e.g., "TTPs/Initial Access")
|
||||
|
||||
Returns:
|
||||
bool: True if the CheckType path exists in the config hierarchy
|
||||
"""
|
||||
try:
|
||||
# Get config directly from global provider like custom checks do
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
if not hasattr(Provider, "_global_provider") or not Provider._global_provider:
|
||||
return False
|
||||
|
||||
# Access config directly like: service_client.audit_config.get("key", default)
|
||||
hierarchy = Provider._global_provider.audit_config.get("aws", {}).get(
|
||||
"valid_check_types", {}
|
||||
)
|
||||
|
||||
if not check_type or not hierarchy:
|
||||
return False
|
||||
|
||||
# Split the path by '/' to get each level
|
||||
path_parts = check_type.split("/")
|
||||
|
||||
# Navigate through the hierarchy using direct lookups
|
||||
current_level = hierarchy
|
||||
for part in path_parts:
|
||||
if not isinstance(current_level, dict) or part not in current_level:
|
||||
return False
|
||||
current_level = current_level[part]
|
||||
|
||||
return True
|
||||
|
||||
except (KeyError, AttributeError):
|
||||
return False
|
||||
|
||||
|
||||
class Code(BaseModel):
|
||||
"""
|
||||
Represents the remediation code using IaC like CloudFormation, Terraform or the native CLI.
|
||||
@@ -94,9 +136,14 @@ class CheckMetadata(BaseModel):
|
||||
Validators:
|
||||
valid_category(value): Validator function to validate the categories of the check.
|
||||
severity_to_lower(severity): Validator function to convert the severity to lowercase.
|
||||
valid_severity(severity): Validator function to validate the severity of the check.
|
||||
valid_cli_command(remediation): Validator function to validate the CLI command is not an URL.
|
||||
valid_resource_type(resource_type): Validator function to validate the resource type is not empty.
|
||||
validate_service_name(service_name, values): Validator function to validate the service name matches CheckID.
|
||||
valid_check_id(check_id): Validator function to validate the CheckID format.
|
||||
validate_check_title(check_title): Validator function to validate CheckTitle max length (150 chars).
|
||||
validate_check_type(check_type, values): Validator function to validate CheckType - no empty strings for all providers, plus predefined types validation for AWS (loaded from config file).
|
||||
validate_description(description): Validator function to validate Description max length (400 chars).
|
||||
validate_risk(risk): Validator function to validate Risk max length (400 chars).
|
||||
"""
|
||||
|
||||
Provider: str
|
||||
@@ -178,6 +225,52 @@ class CheckMetadata(BaseModel):
|
||||
|
||||
return check_id
|
||||
|
||||
@validator("CheckTitle", pre=True, always=True)
|
||||
def validate_check_title(cls, check_title):
|
||||
if len(check_title) > 150:
|
||||
raise ValueError(
|
||||
f"CheckTitle must not exceed 150 characters, got {len(check_title)} characters"
|
||||
)
|
||||
return check_title
|
||||
|
||||
@validator("CheckType", pre=True, always=True)
|
||||
def validate_check_type(cls, check_type, values):
|
||||
# Check for empty strings in the list - applies to ALL providers
|
||||
for i, check_type_item in enumerate(check_type):
|
||||
if not check_type_item or check_type_item.strip() == "":
|
||||
raise ValueError(
|
||||
f"CheckType list cannot contain empty strings. Found empty string at index {i}."
|
||||
)
|
||||
|
||||
provider = values.get("Provider", "").lower()
|
||||
|
||||
# For AWS provider, also validate against config hierarchy (like custom checks)
|
||||
if provider == "aws":
|
||||
for check_type_item in check_type:
|
||||
if not _validate_aws_check_type_in_config(check_type_item):
|
||||
raise ValueError(
|
||||
f"Invalid CheckType: '{check_type_item}'. Must be a valid path in the AWS CheckType hierarchy. "
|
||||
f"See prowler/config/config.yaml 'aws.valid_check_types' for valid values."
|
||||
)
|
||||
|
||||
return check_type
|
||||
|
||||
@validator("Description", pre=True, always=True)
|
||||
def validate_description(cls, description):
|
||||
if len(description) > 400:
|
||||
raise ValueError(
|
||||
f"Description must not exceed 400 characters, got {len(description)} characters"
|
||||
)
|
||||
return description
|
||||
|
||||
@validator("Risk", pre=True, always=True)
|
||||
def validate_risk(cls, risk):
|
||||
if len(risk) > 400:
|
||||
raise ValueError(
|
||||
f"Risk must not exceed 400 characters, got {len(risk)} characters"
|
||||
)
|
||||
return risk
|
||||
|
||||
@staticmethod
|
||||
def get_bulk(provider: str) -> dict[str, "CheckMetadata"]:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user