Compare commits

...

1 Commits

Author SHA1 Message Date
Toni de la Fuente
a534e50df4 feat(cloudflare): add Cloudflare provider with 13 security checks
Add complete Cloudflare provider integration to Prowler with comprehensive
security checks covering SSL/TLS, DNS, and firewall configurations.

Features:
- Cloudflare provider with API Token and API Key authentication
- 13 security checks across 3 services (SSL/TLS, DNS, Firewall)
- Support for zone-specific and account-wide scanning
- Full CLI integration with --api-token, --api-key, --api-email, --zone-id flags
- Mutelist support for suppressing findings
- Complete output support (CSV, JSON-OCSF, HTML)

Security Checks:
SSL/TLS (8 checks):
- ssl_mode_full_strict: Ensure SSL/TLS mode is Full (strict)
- ssl_tls_minimum_version: Ensure minimum TLS version is 1.2+
- ssl_tls_1_3_enabled: Ensure TLS 1.3 is enabled
- ssl_hsts_enabled: Ensure HSTS with recommended max-age
- ssl_hsts_include_subdomains: Ensure HSTS includes subdomains
- ssl_always_use_https: Ensure Always Use HTTPS is enabled
- ssl_automatic_https_rewrites_enabled: Ensure automatic HTTPS rewrites
- ssl_opportunistic_encryption_enabled: Ensure opportunistic encryption

Firewall (4 checks):
- firewall_waf_enabled: Ensure WAF is enabled
- firewall_security_level_medium_or_higher: Ensure security level >= medium
- firewall_browser_integrity_check_enabled: Ensure browser integrity check
- firewall_challenge_passage_configured: Ensure challenge passage configured

DNS (1 check):
- dns_dnssec_enabled: Ensure DNSSEC is enabled

Core Changes:
- Add CheckReportCloudflare model to prowler/lib/check/models.py
- Add Cloudflare provider initialization to prowler/providers/common/provider.py
- Add CloudflareOutputOptions to prowler/__main__.py
- Add Cloudflare output mapping to prowler/lib/outputs/finding.py
- Add Cloudflare entity type to prowler/lib/outputs/summary_table.py

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-23 18:06:44 +02:00
67 changed files with 2410 additions and 1 deletions

View File

@@ -49,10 +49,10 @@ from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
AWSWellArchitected,
)
from prowler.lib.outputs.compliance.c5.c5_aws import AWSC5
from prowler.lib.outputs.compliance.ccc.ccc_aws import CCC_AWS
from prowler.lib.outputs.compliance.ccc.ccc_azure import CCC_Azure
from prowler.lib.outputs.compliance.ccc.ccc_gcp import CCC_GCP
from prowler.lib.outputs.compliance.c5.c5_aws import AWSC5
from prowler.lib.outputs.compliance.cis.cis_aws import AWSCIS
from prowler.lib.outputs.compliance.cis.cis_azure import AzureCIS
from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
@@ -102,6 +102,7 @@ from prowler.providers.aws.lib.s3.s3 import S3
from prowler.providers.aws.lib.security_hub.security_hub import SecurityHub
from prowler.providers.aws.models import AWSOutputOptions
from prowler.providers.azure.models import AzureOutputOptions
from prowler.providers.cloudflare.models import CloudflareOutputOptions
from prowler.providers.common.provider import Provider
from prowler.providers.common.quick_inventory import run_provider_quick_inventory
from prowler.providers.gcp.models import GCPOutputOptions
@@ -336,6 +337,10 @@ def prowler():
output_options = OCIOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "cloudflare":
output_options = CloudflareOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:

View File

View File

@@ -694,6 +694,37 @@ class CheckReportGithub(Check_Report):
)
@dataclass
class CheckReportCloudflare(Check_Report):
"""Contains the Cloudflare Check's finding information."""
resource_name: str
resource_id: str
zone_name: str
def __init__(
self,
metadata: Dict,
resource: Any,
resource_name: str = None,
resource_id: str = None,
zone_name: str = None,
) -> None:
"""Initialize the Cloudflare Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource. Defaults to None.
resource_name: The name of the resource related with the finding.
resource_id: The id of the resource related with the finding.
zone_name: The zone name of the resource related with the finding.
"""
super().__init__(metadata, resource)
self.resource_name = resource_name or getattr(resource, "name", "")
self.resource_id = resource_id or getattr(resource, "id", "")
self.zone_name = zone_name or getattr(resource, "zone_name", "")
@dataclass
class CheckReportM365(Check_Report):
"""Contains the M365 Check's finding information."""

View File

@@ -337,6 +337,21 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.region
elif provider.type == "cloudflare":
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = get_nested_attribute(
provider, "identity.account_id"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.account_name"
)
output_data["account_email"] = get_nested_attribute(
provider, "identity.account_email"
)
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.zone_name
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name

View File

@@ -67,6 +67,9 @@ def display_summary_table(
elif provider.type == "llm":
entity_type = "LLM"
audited_entities = provider.model
elif provider.type == "cloudflare":
entity_type = "Account"
audited_entities = provider.identity.account_name
elif provider.type == "oci":
entity_type = "Tenancy"
audited_entities = (

View File

@@ -0,0 +1,178 @@
# Cloudflare Provider for Prowler
This directory contains the Cloudflare provider implementation for Prowler, enabling Cloud Security Posture Management (CSPM) for Cloudflare infrastructure.
## Overview
The Cloudflare provider allows Prowler to scan and assess the security posture of your Cloudflare zones, firewall rules, SSL/TLS settings, and other security configurations.
## Authentication
The Cloudflare provider supports two authentication methods:
### 1. API Token (Recommended)
Create an API token with the necessary permissions at https://dash.cloudflare.com/profile/api-tokens
```bash
export CLOUDFLARE_API_TOKEN="your-api-token"
prowler cloudflare
```
Or pass it directly:
```bash
prowler cloudflare --api-token "your-api-token"
```
### 2. API Key + Email
Use your Global API Key and email:
```bash
export CLOUDFLARE_API_KEY="your-api-key"
export CLOUDFLARE_API_EMAIL="your@email.com"
prowler cloudflare
```
Or pass them directly:
```bash
prowler cloudflare --api-key "your-api-key" --api-email "your@email.com"
```
## Scoping
You can scope your scan to specific accounts or zones:
```bash
# Scan specific zones
prowler cloudflare --zone-id zone_id_1 zone_id_2
# Scan specific accounts
prowler cloudflare --account-id account_id_1 account_id_2
```
## Available Services
The Cloudflare provider currently includes the following services:
- **firewall**: Firewall rules and Web Application Firewall (WAF) settings
- **ssl**: SSL/TLS configuration and certificate settings
## Security Checks
### Firewall Service
- `firewall_waf_enabled`: Ensures Web Application Firewall (WAF) is enabled for zones
### SSL Service
- `ssl_tls_minimum_version`: Ensures minimum TLS version is set to 1.2 or higher
- `ssl_always_use_https`: Ensures 'Always Use HTTPS' is enabled for automatic HTTP to HTTPS redirects
## Directory Structure
```
cloudflare/
├── cloudflare_provider.py # Main provider class
├── models.py # Cloudflare-specific models
├── exceptions/ # Cloudflare-specific exceptions
│ └── exceptions.py
├── lib/
│ ├── arguments/ # CLI argument definitions
│ ├── mutelist/ # Mutelist functionality
│ └── service/ # Base service class
└── services/ # Cloudflare services
├── firewall/ # Firewall service and checks
│ ├── firewall_service.py
│ ├── firewall_client.py
│ └── firewall_waf_enabled/
└── ssl/ # SSL/TLS service and checks
├── ssl_service.py
├── ssl_client.py
├── ssl_tls_minimum_version/
└── ssl_always_use_https/
```
## Usage Examples
### Basic Scan
```bash
prowler cloudflare
```
### Scan with API Token
```bash
prowler cloudflare --api-token "your-api-token"
```
### Scan Specific Zones
```bash
prowler cloudflare --zone-id zone_123 zone_456
```
### Run Specific Checks
```bash
prowler cloudflare -c ssl_tls_minimum_version ssl_always_use_https
```
### Generate JSON Output
```bash
prowler cloudflare -o json
```
## Required Permissions
For the API token, you need the following permissions:
- **Zone:Read** - To list and read zone information
- **Zone Settings:Read** - To read zone settings including SSL/TLS configurations
- **Firewall Services:Read** - To read firewall rules and WAF settings
- **User:Read** - To verify authentication
## Adding New Checks
To add a new security check:
1. Create a new directory under the appropriate service (e.g., `services/firewall/new_check_name/`)
2. Create the check file: `new_check_name.py`
3. Create the metadata file: `new_check_name.metadata.json`
4. Implement the check class inheriting from `Check`
5. Use `CheckReportCloudflare` for findings
Example check structure:
```python
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.service_name.service_client import service_client
class check_name(Check):
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for resource_id, resource in service_client.resources.items():
report = CheckReportCloudflare(metadata=self.metadata(), resource=resource)
# Implement your check logic here
findings.append(report)
return findings
```
## Contributing
When contributing new services or checks:
1. Follow the existing directory structure
2. Include comprehensive metadata for each check
3. Add appropriate error handling
4. Update this README with new services/checks
5. Test thoroughly with various Cloudflare configurations
## Support
For issues, questions, or contributions, please refer to the main Prowler repository.

View File

View File

@@ -0,0 +1,406 @@
import os
from os import environ
import requests
from colorama import Fore, Style
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.utils.utils import print_boxes
from prowler.providers.cloudflare.exceptions.exceptions import (
CloudflareEnvironmentVariableError,
CloudflareInvalidCredentialsError,
CloudflareSetUpIdentityError,
CloudflareSetUpSessionError,
)
from prowler.providers.cloudflare.lib.mutelist.mutelist import CloudflareMutelist
from prowler.providers.cloudflare.models import (
CloudflareIdentityInfo,
CloudflareSession,
)
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
class CloudflareProvider(Provider):
"""
Cloudflare Provider class
This class is responsible for setting up the Cloudflare provider, including the session, identity,
audit configuration, fixer configuration, and mutelist.
Attributes:
_type (str): The type of the provider.
_auth_method (str): The authentication method used by the provider.
_session (CloudflareSession): The session object for the provider.
_identity (CloudflareIdentityInfo): The identity information for the provider.
_audit_config (dict): The audit configuration for the provider.
_fixer_config (dict): The fixer configuration for the provider.
_mutelist (Mutelist): The mutelist for the provider.
_account_ids (list): List of account IDs to scan.
_zone_ids (list): List of zone IDs to scan.
audit_metadata (Audit_Metadata): The audit metadata for the provider.
"""
_type: str = "cloudflare"
_auth_method: str = None
_session: CloudflareSession
_identity: CloudflareIdentityInfo
_audit_config: dict
_mutelist: Mutelist
_account_ids: list
_zone_ids: list
audit_metadata: Audit_Metadata
def __init__(
self,
# Authentication credentials
api_token: str = "",
api_key: str = "",
api_email: str = "",
# Provider configuration
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
account_ids: list = None,
zone_ids: list = None,
):
"""
Cloudflare Provider constructor
Args:
api_token (str): Cloudflare API Token.
api_key (str): Cloudflare API Key.
api_email (str): Cloudflare API Email (used with API Key).
config_path (str): Path to the audit configuration file.
config_content (dict): Audit configuration content.
fixer_config (dict): Fixer configuration content.
mutelist_path (str): Path to the mutelist file.
mutelist_content (dict): Mutelist content.
account_ids (list): List of account IDs to scan.
zone_ids (list): List of zone IDs to scan.
"""
logger.info("Instantiating Cloudflare Provider...")
# Set scoping parameters
self._account_ids = account_ids or []
self._zone_ids = zone_ids or []
self._session = CloudflareProvider.setup_session(api_token, api_key, api_email)
# Set the authentication method
if api_token:
self._auth_method = "API Token"
elif api_key and api_email:
self._auth_method = "API Key + Email"
elif environ.get("CLOUDFLARE_API_TOKEN", ""):
self._auth_method = "Environment Variable for API Token"
elif environ.get("CLOUDFLARE_API_KEY", "") and environ.get(
"CLOUDFLARE_API_EMAIL", ""
):
self._auth_method = "Environment Variables for API Key and Email"
self._identity = CloudflareProvider.setup_identity(self._session)
# Audit Config
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
# Fixer Config
self._fixer_config = fixer_config
# Mutelist
if mutelist_content:
self._mutelist = CloudflareMutelist(
mutelist_content=mutelist_content,
)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self.type)
self._mutelist = CloudflareMutelist(
mutelist_path=mutelist_path,
)
Provider.set_global_provider(self)
@property
def auth_method(self):
"""Returns the authentication method for the Cloudflare provider."""
return self._auth_method
@property
def session(self):
"""Returns the session object for the Cloudflare provider."""
return self._session
@property
def identity(self):
"""Returns the identity information for the Cloudflare provider."""
return self._identity
@property
def type(self):
"""Returns the type of the Cloudflare provider."""
return self._type
@property
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def mutelist(self) -> CloudflareMutelist:
"""
mutelist method returns the provider's mutelist.
"""
return self._mutelist
@property
def account_ids(self) -> list:
"""
account_ids method returns the provider's account ID list for scoping.
"""
return self._account_ids
@property
def zone_ids(self) -> list:
"""
zone_ids method returns the provider's zone ID list for scoping.
"""
return self._zone_ids
@staticmethod
def setup_session(
api_token: str = None,
api_key: str = None,
api_email: str = None,
) -> CloudflareSession:
"""
Returns the Cloudflare session with authentication credentials.
Args:
api_token (str): Cloudflare API Token.
api_key (str): Cloudflare API Key.
api_email (str): Cloudflare API Email.
Returns:
CloudflareSession: Authenticated session credentials for API requests.
"""
session_api_token = ""
session_api_key = ""
session_api_email = ""
try:
# Ensure that at least one authentication method is selected
if api_token:
session_api_token = api_token
elif api_key and api_email:
session_api_key = api_key
session_api_email = api_email
else:
# Try API Token from environment variable
logger.info(
"Looking for CLOUDFLARE_API_TOKEN environment variable as user has not provided any credentials...."
)
session_api_token = environ.get("CLOUDFLARE_API_TOKEN", "")
if not session_api_token:
# Try API Key + Email from environment variables
logger.info(
"Looking for CLOUDFLARE_API_KEY and CLOUDFLARE_API_EMAIL environment variables...."
)
session_api_key = environ.get("CLOUDFLARE_API_KEY", "")
session_api_email = environ.get("CLOUDFLARE_API_EMAIL", "")
if not session_api_token and not (session_api_key and session_api_email):
raise CloudflareEnvironmentVariableError(
file=os.path.basename(__file__),
message="No authentication method selected and no environment variables were found.",
)
credentials = CloudflareSession(
api_token=session_api_token,
api_key=session_api_key,
api_email=session_api_email,
)
return credentials
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
raise CloudflareSetUpSessionError(
original_exception=error,
)
@staticmethod
def setup_identity(session: CloudflareSession) -> CloudflareIdentityInfo:
"""
Returns the Cloudflare identity information
Returns:
CloudflareIdentityInfo: An instance of CloudflareIdentityInfo containing the identity information.
"""
try:
# Setup headers for API requests
headers = CloudflareProvider._get_headers(session)
# Verify user endpoint to get account information
response = requests.get(
"https://api.cloudflare.com/client/v4/user", headers=headers, timeout=10
)
if response.status_code != 200:
raise CloudflareInvalidCredentialsError(
message=f"Failed to authenticate with Cloudflare API: {response.status_code} - {response.text}"
)
try:
user_data = response.json()
except Exception as json_error:
raise CloudflareInvalidCredentialsError(
message=f"Failed to parse Cloudflare API response: {json_error}. Response text: {response.text[:200]}"
)
if not user_data:
raise CloudflareInvalidCredentialsError(
message=f"Cloudflare API returned empty response. Status: {response.status_code}"
)
if not user_data.get("success", False):
error_messages = user_data.get("errors", [])
raise CloudflareInvalidCredentialsError(
message=f"Cloudflare API authentication failed: {error_messages}"
)
result = user_data.get("result")
if not result:
raise CloudflareInvalidCredentialsError(
message=f"Cloudflare API returned empty result. Full response: {user_data}"
)
identity = CloudflareIdentityInfo(
account_id=str(result.get("id", "")),
account_name=result.get("username") or result.get("email", "Unknown"),
account_email=result.get("email", ""),
)
return identity
except CloudflareInvalidCredentialsError:
raise
except Exception as error:
# Get line number safely
lineno = error.__traceback__.tb_lineno if error.__traceback__ else "unknown"
logger.critical(f"{error.__class__.__name__}[{lineno}]: {error}")
raise CloudflareSetUpIdentityError(
original_exception=error,
)
@staticmethod
def _get_headers(session: CloudflareSession) -> dict:
"""
Returns HTTP headers for Cloudflare API requests.
Args:
session (CloudflareSession): The Cloudflare session with authentication.
Returns:
dict: Headers dictionary with authentication credentials.
"""
headers = {"Content-Type": "application/json"}
if session.api_token:
headers["Authorization"] = f"Bearer {session.api_token}"
elif session.api_key and session.api_email:
headers["X-Auth-Key"] = session.api_key
headers["X-Auth-Email"] = session.api_email
return headers
def print_credentials(self):
"""
Prints the Cloudflare credentials.
Usage:
>>> self.print_credentials()
"""
report_lines = [
f"Cloudflare Account ID: {Fore.YELLOW}{self.identity.account_id}{Style.RESET_ALL}",
f"Cloudflare Account Name: {Fore.YELLOW}{self.identity.account_name}{Style.RESET_ALL}",
f"Cloudflare Account Email: {Fore.YELLOW}{self.identity.account_email}{Style.RESET_ALL}",
f"Authentication Method: {Fore.YELLOW}{self.auth_method}{Style.RESET_ALL}",
]
report_title = (
f"{Style.BRIGHT}Using the Cloudflare credentials below:{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
def test_connection(
api_token: str = "",
api_key: str = "",
api_email: str = "",
raise_on_exception: bool = True,
) -> Connection:
"""Test connection to Cloudflare.
Test the connection to Cloudflare using the provided credentials.
Args:
api_token (str): Cloudflare API Token.
api_key (str): Cloudflare API Key.
api_email (str): Cloudflare API Email.
raise_on_exception (bool): Flag indicating whether to raise an exception if the connection fails.
Returns:
Connection: Connection object with success status or error information.
Raises:
Exception: If failed to test the connection to Cloudflare.
CloudflareEnvironmentVariableError: If environment variables are missing.
CloudflareInvalidCredentialsError: If the provided credentials are invalid.
CloudflareSetUpSessionError: If there is an error setting up the session.
CloudflareSetUpIdentityError: If there is an error setting up the identity.
Examples:
>>> CloudflareProvider.test_connection(api_token="your-api-token")
Connection(is_connected=True)
>>> CloudflareProvider.test_connection(api_key="your-api-key", api_email="your@email.com")
Connection(is_connected=True)
"""
try:
# Set up the Cloudflare session
session = CloudflareProvider.setup_session(
api_token=api_token,
api_key=api_key,
api_email=api_email,
)
# Set up the identity to test the connection
CloudflareProvider.setup_identity(session)
return Connection(is_connected=True)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if raise_on_exception:
raise error
return Connection(error=error)

View File

@@ -0,0 +1,13 @@
from prowler.providers.cloudflare.exceptions.exceptions import (
CloudflareEnvironmentVariableError,
CloudflareInvalidCredentialsError,
CloudflareSetUpIdentityError,
CloudflareSetUpSessionError,
)
__all__ = [
"CloudflareEnvironmentVariableError",
"CloudflareInvalidCredentialsError",
"CloudflareSetUpIdentityError",
"CloudflareSetUpSessionError",
]

View File

@@ -0,0 +1,71 @@
from prowler.exceptions.exceptions import ProwlerException
class CloudflareException(ProwlerException):
"""Base class for Cloudflare Provider exceptions"""
CLOUDFLARE_ERROR_CODES = {
(1000, "CloudflareEnvironmentVariableError"): {
"message": "Cloudflare environment variables are not set correctly",
"remediation": "Ensure that CLOUDFLARE_API_TOKEN or CLOUDFLARE_API_KEY and CLOUDFLARE_API_EMAIL environment variables are set correctly.",
},
(1001, "CloudflareInvalidCredentialsError"): {
"message": "Cloudflare credentials are invalid",
"remediation": "Ensure that the provided Cloudflare API credentials are valid and have the necessary permissions.",
},
(1002, "CloudflareSetUpSessionError"): {
"message": "Error setting up Cloudflare session",
"remediation": "Check your Cloudflare API credentials and network connectivity.",
},
(1003, "CloudflareSetUpIdentityError"): {
"message": "Error setting up Cloudflare identity",
"remediation": "Ensure that your Cloudflare API credentials have the necessary permissions to retrieve account information.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
provider = "Cloudflare"
error_info = self.CLOUDFLARE_ERROR_CODES.get((code, self.__class__.__name__))
if not error_info:
error_info = {
"message": "Unknown Cloudflare error",
"remediation": "Please check your configuration.",
}
if message:
error_info = error_info.copy()
error_info["message"] = message
super().__init__(
code=code,
source=provider,
file=file,
original_exception=original_exception,
error_info=error_info,
)
class CloudflareEnvironmentVariableError(CloudflareException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
1000, file=file, original_exception=original_exception, message=message
)
class CloudflareInvalidCredentialsError(CloudflareException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
1001, file=file, original_exception=original_exception, message=message
)
class CloudflareSetUpSessionError(CloudflareException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
1002, file=file, original_exception=original_exception, message=message
)
class CloudflareSetUpIdentityError(CloudflareException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
1003, file=file, original_exception=original_exception, message=message
)

View File

@@ -0,0 +1,76 @@
def init_parser(self):
"""Init the Cloudflare Provider CLI parser"""
cloudflare_parser = self.subparsers.add_parser(
"cloudflare",
parents=[self.common_providers_parser],
help="Cloudflare Provider",
)
cloudflare_auth_subparser = cloudflare_parser.add_argument_group(
"Authentication Modes"
)
# Authentication Modes
cloudflare_auth_subparser.add_argument(
"--api-token",
nargs="?",
help="Cloudflare API Token for authentication",
default=None,
metavar="CLOUDFLARE_API_TOKEN",
)
cloudflare_auth_subparser.add_argument(
"--api-key",
nargs="?",
help="Cloudflare API Key for authentication (requires --api-email)",
default=None,
metavar="CLOUDFLARE_API_KEY",
)
cloudflare_auth_subparser.add_argument(
"--api-email",
nargs="?",
help="Cloudflare API Email for authentication (used with --api-key)",
default=None,
metavar="CLOUDFLARE_API_EMAIL",
)
cloudflare_scoping_subparser = cloudflare_parser.add_argument_group("Scan Scoping")
cloudflare_scoping_subparser.add_argument(
"--account-id",
"--account-ids",
nargs="*",
help="Cloudflare Account ID(s) to scan",
default=None,
metavar="ACCOUNT_ID",
)
cloudflare_scoping_subparser.add_argument(
"--zone-id",
"--zone-ids",
nargs="*",
help="Cloudflare Zone ID(s) to scan",
default=None,
metavar="ZONE_ID",
)
def validate_arguments(arguments):
"""
Validate Cloudflare provider arguments.
Returns:
tuple: (is_valid, error_message)
"""
# If API key is provided, email must also be provided
if arguments.api_key and not arguments.api_email:
return (
False,
"Cloudflare API Key requires API Email. Please provide --api-email",
)
if arguments.api_email and not arguments.api_key:
return (
False,
"Cloudflare API Email requires API Key. Please provide --api-key",
)
return (True, "")

View File

@@ -0,0 +1,34 @@
from prowler.lib.check.models import CheckReportCloudflare
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class CloudflareMutelist(Mutelist):
"""
CloudflareMutelist class extends the Mutelist class to provide Cloudflare-specific mutelist functionality.
This class is used to manage muted findings for Cloudflare resources.
"""
def is_finding_muted(
self,
finding: CheckReportCloudflare,
account_name: str,
) -> bool:
"""
Check if a finding is muted based on the mutelist configuration.
Args:
finding (CheckReportCloudflare): The finding to check
account_name (str): The Cloudflare account name
Returns:
bool: True if the finding is muted, False otherwise
"""
return self.is_muted(
account_name,
finding.check_metadata.CheckID,
"*", # Cloudflare doesn't have regions
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -0,0 +1,169 @@
import requests
from colorama import Fore, Style
from prowler.lib.logger import logger
class CloudflareService:
"""
Base class for Cloudflare services
This class provides common functionality for all Cloudflare services,
including API client setup and error handling.
"""
def __init__(self, service_name: str, provider):
"""
Initialize CloudflareService
Args:
service_name (str): Name of the service
provider: Cloudflare provider instance
"""
self.service = service_name
self.provider = provider
self.session = provider.session
self.api_base_url = "https://api.cloudflare.com/client/v4"
self.headers = self._get_headers()
def _get_headers(self) -> dict:
"""
Returns HTTP headers for Cloudflare API requests.
Returns:
dict: Headers dictionary with authentication credentials.
"""
headers = {"Content-Type": "application/json"}
if self.session.api_token:
headers["Authorization"] = f"Bearer {self.session.api_token}"
elif self.session.api_key and self.session.api_email:
headers["X-Auth-Key"] = self.session.api_key
headers["X-Auth-Email"] = self.session.api_email
return headers
def _api_request(
self, method: str, endpoint: str, params: dict = None, json_data: dict = None
) -> dict:
"""
Make an API request to Cloudflare
Args:
method (str): HTTP method (GET, POST, PUT, DELETE)
endpoint (str): API endpoint (e.g., "/accounts")
params (dict): Query parameters
json_data (dict): JSON data for POST/PUT requests
Returns:
dict: API response data
Raises:
Exception: If the API request fails
"""
url = f"{self.api_base_url}{endpoint}"
try:
response = requests.request(
method=method,
url=url,
headers=self.headers,
params=params,
json=json_data,
timeout=30,
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
errors = data.get("errors", [])
logger.error(
f"{Fore.RED}Cloudflare API Error:{Style.RESET_ALL} {errors}"
)
return {}
return data.get("result", {})
except requests.exceptions.RequestException as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
def _api_request_paginated(
self, endpoint: str, params: dict = None, page_size: int = 50
) -> list:
"""
Make a paginated API request to Cloudflare
Args:
endpoint (str): API endpoint
params (dict): Query parameters
page_size (int): Number of results per page
Returns:
list: Combined results from all pages
"""
all_results = []
page = 1
if params is None:
params = {}
params["per_page"] = page_size
while True:
params["page"] = page
url = f"{self.api_base_url}{endpoint}"
try:
response = requests.get(
url, headers=self.headers, params=params, timeout=30
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
break
result = data.get("result", [])
if not result:
break
all_results.extend(result)
# Check if there are more pages
result_info = data.get("result_info", {})
if page >= result_info.get("total_pages", 0):
break
page += 1
except requests.exceptions.RequestException as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
break
return all_results
def _handle_cloudflare_api_error(
self, error: Exception, action: str, resource: str = ""
):
"""
Handle Cloudflare API errors with consistent logging
Args:
error (Exception): The exception that occurred
action (str): Description of the action being performed
resource (str): The resource being accessed
"""
error_message = f"Error {action}"
if resource:
error_message += f" for {resource}"
error_message += f": {error}"
logger.error(
f"{Fore.RED}{error_message}{Style.RESET_ALL} ({error.__class__.__name__})"
)

View File

@@ -0,0 +1,40 @@
from typing import Optional
from pydantic.v1 import BaseModel
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class CloudflareSession(BaseModel):
"""Cloudflare session model storing authentication credentials"""
api_token: Optional[str] = None
api_key: Optional[str] = None
api_email: Optional[str] = None
class CloudflareIdentityInfo(BaseModel):
"""Cloudflare account identity information"""
account_id: str
account_name: str
account_email: str
class CloudflareOutputOptions(ProviderOutputOptions):
"""Cloudflare-specific output options"""
def __init__(self, arguments, bulk_checks_metadata, identity):
# First call ProviderOutputOptions init
super().__init__(arguments, bulk_checks_metadata)
# Check if custom output filename was input, if not, set the default
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
self.output_filename = (
f"prowler-output-{identity.account_name}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename

View File

@@ -0,0 +1,3 @@
from .dns_service import DNS
dns_client = DNS

View File

@@ -0,0 +1,4 @@
from prowler.providers.cloudflare.services.dns.dns_service import DNS
from prowler.providers.common.provider import Provider
dns_client = DNS(Provider.get_global_provider())

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "dns_dnssec_enabled",
"CheckTitle": "Ensure DNSSEC is enabled to prevent DNS spoofing",
"CheckType": [],
"ServiceName": "dns",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"Description": "This check ensures that DNSSEC (DNS Security Extensions) is enabled for Cloudflare zones to prevent DNS spoofing attacks and ensure data integrity by cryptographically signing DNS records.",
"Risk": "Without DNSSEC enabled, attackers can perform DNS spoofing (cache poisoning) attacks, redirecting users to malicious sites and intercepting sensitive information.",
"RelatedUrl": "https://developers.cloudflare.com/dns/dnssec/",
"Remediation": {
"Code": {
"CLI": "cloudflare dns dnssec enable --zone-id <zone_id>",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> DNS -> Settings -> DNSSEC -> Enable DNSSEC",
"Terraform": "resource \"cloudflare_zone_dnssec\" \"example\" {\n zone_id = var.zone_id\n}"
},
"Recommendation": {
"Text": "Enable DNSSEC for all Cloudflare zones to prevent DNS spoofing and ensure DNS data integrity. After enabling, add DS records to your domain registrar.",
"Url": "https://developers.cloudflare.com/dns/dnssec/"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "After enabling DNSSEC in Cloudflare, you must add the DS records to your domain registrar for DNSSEC to function properly."
}

View File

@@ -0,0 +1,31 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.dns.dns_client import dns_client
class dns_dnssec_enabled(Check):
"""Check if DNSSEC is enabled to prevent DNS spoofing"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, dnssec_settings in dns_client.dnssec_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=dnssec_settings,
resource_name=dnssec_settings.zone_name,
resource_id=zone_id,
zone_name=dnssec_settings.zone_name,
)
if dnssec_settings.dnssec_enabled:
report.status = "PASS"
report.status_extended = f"Zone {dnssec_settings.zone_name} has DNSSEC enabled (status: {dnssec_settings.dnssec_status}), preventing DNS spoofing and ensuring data integrity."
else:
report.status = "FAIL"
report.status_extended = f"Zone {dnssec_settings.zone_name} does not have DNSSEC enabled (status: {dnssec_settings.dnssec_status}). Enable DNSSEC to prevent DNS spoofing and ensure data integrity."
findings.append(report)
return findings

View File

@@ -0,0 +1,107 @@
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.cloudflare.lib.service.service import CloudflareService
class DNS(CloudflareService):
"""Cloudflare DNS service for managing DNS settings"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.zones = self._list_zones()
self.dnssec_settings = self._get_dnssec_settings()
def _list_zones(self) -> dict:
"""
List all Cloudflare zones
Returns:
dict: Dictionary of zones keyed by zone ID
"""
logger.info("DNS - Listing Zones...")
zones = {}
try:
# If specific zone IDs are provided, use those
if self.provider.zone_ids:
for zone_id in self.provider.zone_ids:
zone_data = self._api_request("GET", f"/zones/{zone_id}")
if zone_data:
zones[zone_data["id"]] = Zone(
id=zone_data["id"],
name=zone_data["name"],
account_id=zone_data.get("account", {}).get("id", ""),
)
else:
# List all zones
all_zones = self._api_request_paginated("/zones")
for zone_data in all_zones:
zones[zone_data["id"]] = Zone(
id=zone_data["id"],
name=zone_data["name"],
account_id=zone_data.get("account", {}).get("id", ""),
)
logger.info(f"Found {len(zones)} zone(s) for DNS checks")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return zones
def _get_dnssec_settings(self) -> dict:
"""
Get DNSSEC settings for all zones
Returns:
dict: Dictionary of DNSSEC settings keyed by zone ID
"""
logger.info("DNS - Getting DNSSEC Settings...")
dnssec_settings = {}
try:
for zone_id, zone in self.zones.items():
# Get DNSSEC status
dnssec = self._api_request("GET", f"/zones/{zone_id}/dnssec")
dnssec_settings[zone_id] = DNSSECSettings(
zone_id=zone_id,
zone_name=zone.name,
dnssec_enabled=(
dnssec.get("status", "disabled") == "active"
if dnssec
else False
),
dnssec_status=(
dnssec.get("status", "disabled") if dnssec else "disabled"
),
)
logger.info(f"Retrieved DNSSEC settings for {len(dnssec_settings)} zone(s)")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return dnssec_settings
class Zone(BaseModel):
"""Model for Cloudflare Zone"""
id: str
name: str
account_id: str
class DNSSECSettings(BaseModel):
"""Model for Cloudflare DNSSEC Settings"""
zone_id: str
zone_name: str
dnssec_enabled: bool
dnssec_status: str

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "firewall_browser_integrity_check_enabled",
"CheckTitle": "Ensure Browser Integrity Check is enabled to filter malicious traffic",
"CheckType": [],
"ServiceName": "firewall",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"Description": "This check ensures that Browser Integrity Check is enabled for Cloudflare zones to filter malicious traffic based on HTTP header anomalies and known attack patterns.",
"Risk": "Without Browser Integrity Check enabled, malicious bots and automated tools with suspicious HTTP headers can access your site, increasing the risk of attacks.",
"RelatedUrl": "https://developers.cloudflare.com/waf/tools/browser-integrity-check/",
"Remediation": {
"Code": {
"CLI": "cloudflare firewall browser-check enable --zone-id <zone_id>",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> Security -> Settings -> Browser Integrity Check -> Enable",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n browser_check = \"on\"\n }\n}"
},
"Recommendation": {
"Text": "Enable Browser Integrity Check for all Cloudflare zones to filter malicious traffic based on HTTP header anomalies.",
"Url": "https://developers.cloudflare.com/waf/tools/browser-integrity-check/"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Browser Integrity Check may occasionally block legitimate traffic from older browsers or automated tools. Monitor and adjust if needed."
}

View File

@@ -0,0 +1,33 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.firewall.firewall_client import (
firewall_client,
)
class firewall_browser_integrity_check_enabled(Check):
"""Check if Browser Integrity Check is enabled to filter malicious traffic"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, security_settings in firewall_client.security_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=security_settings,
resource_name=security_settings.zone_name,
resource_id=zone_id,
zone_name=security_settings.zone_name,
)
if security_settings.browser_integrity_check:
report.status = "PASS"
report.status_extended = f"Zone {security_settings.zone_name} has Browser Integrity Check enabled, filtering malicious traffic based on HTTP header anomalies."
else:
report.status = "FAIL"
report.status_extended = f"Zone {security_settings.zone_name} does not have Browser Integrity Check enabled. Enable it to filter malicious traffic based on HTTP header anomalies."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "cloudflare",
"CheckID": "firewall_challenge_passage_configured",
"CheckTitle": "Ensure Challenge Passage is configured appropriately",
"CheckType": [],
"ServiceName": "firewall",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Zone",
"Description": "This check ensures that Challenge Passage (challenge TTL) is configured to an appropriate value (recommended: 1 hour / 3600 seconds) to reduce friction for verified visitors while maintaining a security window.",
"Risk": "Setting Challenge Passage too short causes excessive challenges for legitimate users, degrading experience. Setting it too long may allow attackers more time to exploit compromised sessions.",
"RelatedUrl": "https://developers.cloudflare.com/waf/tools/challenge-passage/",
"Remediation": {
"Code": {
"CLI": "cloudflare firewall challenge-ttl set --zone-id <zone_id> --ttl 3600",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> Security -> Settings -> Challenge Passage -> Set to 1 hour",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n challenge_ttl = 3600\n }\n}"
},
"Recommendation": {
"Text": "Set Challenge Passage to 1 hour (3600 seconds) for all Cloudflare zones to balance security with user experience.",
"Url": "https://developers.cloudflare.com/waf/tools/challenge-passage/"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Challenge Passage determines how long a visitor who passes a challenge can access the site without being challenged again."
}

View File

@@ -0,0 +1,35 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.firewall.firewall_client import (
firewall_client,
)
class firewall_challenge_passage_configured(Check):
"""Check if Challenge Passage is configured appropriately"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
# Recommended challenge TTL is 1 hour (3600 seconds) to balance security and user experience
recommended_ttl = 3600
for zone_id, security_settings in firewall_client.security_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=security_settings,
resource_name=security_settings.zone_name,
resource_id=zone_id,
zone_name=security_settings.zone_name,
)
if security_settings.challenge_ttl == recommended_ttl:
report.status = "PASS"
report.status_extended = f"Zone {security_settings.zone_name} has Challenge Passage set to {security_settings.challenge_ttl} seconds (recommended: {recommended_ttl}), balancing security with user experience."
else:
report.status = "FAIL"
report.status_extended = f"Zone {security_settings.zone_name} has Challenge Passage set to {security_settings.challenge_ttl} seconds. Recommended: {recommended_ttl} seconds (1 hour) to reduce friction for verified visitors while maintaining security."
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.cloudflare.services.firewall.firewall_service import Firewall
from prowler.providers.common.provider import Provider
firewall_client = Firewall(Provider.get_global_provider())

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "firewall_security_level_medium_or_higher",
"CheckTitle": "Ensure Security Level is set to Medium or higher",
"CheckType": [],
"ServiceName": "firewall",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"Description": "This check ensures that Security Level is set to Medium or higher for Cloudflare zones to balance protection with user accessibility by filtering suspicious traffic.",
"Risk": "Setting Security Level too low (off, essentially off, or low) may allow malicious traffic to reach your origin server, increasing the risk of attacks.",
"RelatedUrl": "https://developers.cloudflare.com/waf/tools/security-level/",
"Remediation": {
"Code": {
"CLI": "cloudflare firewall security-level set --zone-id <zone_id> --level medium",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> Security -> Settings -> Security Level -> Set to Medium or higher",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n security_level = \"medium\"\n }\n}"
},
"Recommendation": {
"Text": "Set Security Level to Medium for all Cloudflare zones. Adjust to High or Under Attack during active attacks.",
"Url": "https://developers.cloudflare.com/waf/tools/security-level/"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Security Level can be temporarily increased to High or Under Attack during active attacks, but Medium is recommended for normal operation."
}

View File

@@ -0,0 +1,35 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.firewall.firewall_client import (
firewall_client,
)
class firewall_security_level_medium_or_higher(Check):
"""Check if Security Level is set to Medium or higher"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
# Security levels in order: off, essentially_off, low, medium, high, under_attack
acceptable_levels = ["medium", "high", "under_attack"]
for zone_id, security_settings in firewall_client.security_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=security_settings,
resource_name=security_settings.zone_name,
resource_id=zone_id,
zone_name=security_settings.zone_name,
)
if security_settings.security_level in acceptable_levels:
report.status = "PASS"
report.status_extended = f"Zone {security_settings.zone_name} has Security Level set to '{security_settings.security_level}', providing adequate protection."
else:
report.status = "FAIL"
report.status_extended = f"Zone {security_settings.zone_name} has Security Level set to '{security_settings.security_level}'. Recommended: 'medium' or higher to balance protection with user accessibility."
findings.append(report)
return findings

View File

@@ -0,0 +1,191 @@
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.cloudflare.lib.service.service import CloudflareService
class Firewall(CloudflareService):
"""Cloudflare Firewall service for managing firewall rules and WAF settings"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.zones = self._list_zones()
self.firewall_rules = self._list_firewall_rules()
self.security_settings = self._get_security_settings()
def _list_zones(self) -> dict:
"""
List all Cloudflare zones
Returns:
dict: Dictionary of zones keyed by zone ID
"""
logger.info("Firewall - Listing Zones...")
zones = {}
try:
# If specific zone IDs are provided, use those
if self.provider.zone_ids:
for zone_id in self.provider.zone_ids:
zone_data = self._api_request("GET", f"/zones/{zone_id}")
if zone_data:
zones[zone_data["id"]] = Zone(
id=zone_data["id"],
name=zone_data["name"],
account_id=zone_data.get("account", {}).get("id", ""),
status=zone_data.get("status", ""),
plan=zone_data.get("plan", {}).get("name", ""),
)
else:
# List all zones
all_zones = self._api_request_paginated("/zones")
for zone_data in all_zones:
zones[zone_data["id"]] = Zone(
id=zone_data["id"],
name=zone_data["name"],
account_id=zone_data.get("account", {}).get("id", ""),
status=zone_data.get("status", ""),
plan=zone_data.get("plan", {}).get("name", ""),
)
logger.info(f"Found {len(zones)} zone(s)")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return zones
def _list_firewall_rules(self) -> dict:
"""
List firewall rules for all zones
Returns:
dict: Dictionary of firewall rules keyed by rule ID
"""
logger.info("Firewall - Listing Firewall Rules...")
firewall_rules = {}
try:
for zone_id, zone in self.zones.items():
# Get firewall rules for the zone
rules_data = self._api_request_paginated(
f"/zones/{zone_id}/firewall/rules"
)
for rule in rules_data:
firewall_rules[rule["id"]] = FirewallRule(
id=rule["id"],
zone_id=zone_id,
zone_name=zone.name,
paused=rule.get("paused", False),
description=rule.get("description", ""),
action=rule.get("action", ""),
priority=rule.get("priority", 0),
filter_id=rule.get("filter", {}).get("id", ""),
)
# Get WAF settings for the zone
waf_settings = self._api_request(
"GET", f"/zones/{zone_id}/firewall/waf/packages"
)
if waf_settings:
zone.waf_enabled = True
logger.info(f"Found {len(firewall_rules)} firewall rule(s)")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return firewall_rules
def _get_security_settings(self) -> dict:
"""
Get security settings for all zones
Returns:
dict: Dictionary of security settings keyed by zone ID
"""
logger.info("Firewall - Getting Security Settings...")
security_settings = {}
try:
for zone_id, zone in self.zones.items():
# Get security level
security_level = self._api_request(
"GET", f"/zones/{zone_id}/settings/security_level"
)
# Get browser integrity check
browser_check = self._api_request(
"GET", f"/zones/{zone_id}/settings/browser_check"
)
# Get challenge passage
challenge_ttl = self._api_request(
"GET", f"/zones/{zone_id}/settings/challenge_ttl"
)
security_settings[zone_id] = SecuritySettings(
zone_id=zone_id,
zone_name=zone.name,
security_level=(
security_level.get("value", "") if security_level else ""
),
browser_integrity_check=(
browser_check.get("value", "off") == "on"
if browser_check
else False
),
challenge_ttl=(
challenge_ttl.get("value", 0) if challenge_ttl else 0
),
)
logger.info(
f"Retrieved security settings for {len(security_settings)} zone(s)"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return security_settings
class Zone(BaseModel):
"""Model for Cloudflare Zone"""
id: str
name: str
account_id: str
status: str
plan: str
waf_enabled: bool = False
class FirewallRule(BaseModel):
"""Model for Cloudflare Firewall Rule"""
id: str
zone_id: str
zone_name: str
paused: bool
description: str
action: str
priority: int
filter_id: str
class SecuritySettings(BaseModel):
"""Model for Cloudflare Security Settings"""
zone_id: str
zone_name: str
security_level: str
browser_integrity_check: bool
challenge_ttl: int

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "firewall_waf_enabled",
"CheckTitle": "Ensure Web Application Firewall (WAF) is enabled",
"CheckType": [],
"ServiceName": "firewall",
"SubServiceName": "",
"ResourceIdTemplate": "zone_id",
"Severity": "high",
"ResourceType": "Zone",
"Description": "This check ensures that Web Application Firewall (WAF) is enabled for Cloudflare zones to protect against common web application attacks such as SQL injection, cross-site scripting (XSS), and other OWASP Top 10 vulnerabilities.",
"Risk": "Without WAF enabled, web applications are vulnerable to common attacks that could lead to data breaches, service disruptions, or unauthorized access.",
"RelatedUrl": "https://developers.cloudflare.com/waf/",
"Remediation": {
"Code": {
"CLI": "cloudflare firewall waf enable --zone-id <zone_id>",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> Security -> WAF -> Enable",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable Web Application Firewall (WAF) for all Cloudflare zones to protect against common web application attacks.",
"Url": "https://developers.cloudflare.com/waf/managed-rules/"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "WAF is available on Pro, Business, and Enterprise plans. Free plans have limited WAF capabilities."
}

View File

@@ -0,0 +1,36 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.firewall.firewall_client import (
firewall_client,
)
class firewall_waf_enabled(Check):
"""Check if Web Application Firewall (WAF) is enabled for Cloudflare zones
This class verifies whether each Cloudflare zone has WAF enabled to protect
against common web application attacks.
"""
def execute(self) -> List[CheckReportCloudflare]:
"""Execute the Cloudflare WAF enabled check
Iterates over all zones and checks if WAF is enabled.
Returns:
List[CheckReportCloudflare]: A list of reports for each zone
"""
findings = []
for zone_id, zone in firewall_client.zones.items():
report = CheckReportCloudflare(metadata=self.metadata(), resource=zone)
report.status = "FAIL"
report.status_extended = f"Zone {zone.name} does not have WAF enabled."
if zone.waf_enabled:
report.status = "PASS"
report.status_extended = f"Zone {zone.name} has WAF enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_always_use_https",
"CheckTitle": "Ensure 'Always Use HTTPS' is enabled",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "zone_id",
"Severity": "medium",
"ResourceType": "Zone",
"Description": "This check ensures that 'Always Use HTTPS' is enabled for Cloudflare zones to automatically redirect all HTTP requests to HTTPS, ensuring all traffic is encrypted.",
"Risk": "Without 'Always Use HTTPS' enabled, visitors may access the website over unencrypted HTTP connections, exposing sensitive data to interception and man-in-the-middle attacks.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/always-use-https/",
"Remediation": {
"Code": {
"CLI": "curl -X PATCH \"https://api.cloudflare.com/v4/zones/<zone_id>/settings/always_use_https\" -H \"Authorization: Bearer <api_token>\" -H \"Content-Type: application/json\" -d '{\"value\":\"on\"}'",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> Always Use HTTPS -> On",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n always_use_https = \"on\"\n }\n}"
},
"Recommendation": {
"Text": "Enable 'Always Use HTTPS' for all Cloudflare zones to ensure all traffic is encrypted and secure.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/always-use-https/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This setting redirects all HTTP requests to HTTPS using a 301 permanent redirect."
}

View File

@@ -0,0 +1,42 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_always_use_https(Check):
"""Check if Cloudflare zones have 'Always Use HTTPS' enabled
This class verifies that each Cloudflare zone has 'Always Use HTTPS' enabled
to automatically redirect HTTP requests to HTTPS.
"""
def execute(self) -> List[CheckReportCloudflare]:
"""Execute the Cloudflare Always Use HTTPS check
Iterates over all SSL settings and checks if Always Use HTTPS is enabled.
Returns:
List[CheckReportCloudflare]: A list of reports for each zone
"""
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
zone = ssl_client.zones.get(zone_id)
if not zone:
continue
report = CheckReportCloudflare(
metadata=self.metadata(), resource=ssl_settings
)
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} does not have 'Always Use HTTPS' enabled."
if ssl_settings.always_use_https:
report.status = "PASS"
report.status_extended = (
f"Zone {ssl_settings.zone_name} has 'Always Use HTTPS' enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_automatic_https_rewrites_enabled",
"CheckTitle": "Ensure Automatic HTTPS Rewrites is enabled to resolve mixed content issues",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"Description": "This check ensures that Automatic HTTPS Rewrites is enabled for Cloudflare zones to automatically rewrite insecure HTTP links to secure HTTPS links, resolving mixed content issues and enhancing site security.",
"Risk": "Without Automatic HTTPS Rewrites, pages may contain mixed content (HTTP resources loaded over HTTPS pages), which browsers block or warn about, degrading user experience and security.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/automatic-https-rewrites/",
"Remediation": {
"Code": {
"CLI": "cloudflare ssl automatic-https-rewrites enable --zone-id <zone_id>",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> Enable Automatic HTTPS Rewrites",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n automatic_https_rewrites = \"on\"\n }\n}"
},
"Recommendation": {
"Text": "Enable Automatic HTTPS Rewrites for all Cloudflare zones to prevent mixed content warnings and ensure all resources load securely.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/automatic-https-rewrites/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This feature works best when combined with Always Use HTTPS to ensure the entire site is served over HTTPS."
}

View File

@@ -0,0 +1,31 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_automatic_https_rewrites_enabled(Check):
"""Check if Automatic HTTPS Rewrites is enabled to resolve mixed content issues"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=ssl_settings,
resource_name=ssl_settings.zone_name,
resource_id=zone_id,
zone_name=ssl_settings.zone_name,
)
if ssl_settings.automatic_https_rewrites:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has Automatic HTTPS Rewrites enabled, resolving mixed content issues and enhancing site security."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} does not have Automatic HTTPS Rewrites enabled. Enable it to automatically rewrite HTTP links to HTTPS and prevent mixed content warnings."
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.cloudflare.services.ssl.ssl_service import SSL
from prowler.providers.common.provider import Provider
ssl_client = SSL(Provider.get_global_provider())

View File

@@ -0,0 +1,33 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_hsts_enabled",
"CheckTitle": "Ensure HSTS (HTTP Strict Transport Security) is enabled with recommended max-age",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"Description": "This check ensures that HSTS (HTTP Strict Transport Security) is enabled for Cloudflare zones with a recommended max-age of at least 6 months (15768000 seconds) to prevent SSL stripping and man-in-the-middle attacks.",
"Risk": "Without HSTS enabled, browsers may initially connect over HTTP, making the connection vulnerable to SSL stripping attacks where an attacker downgrades the connection to unencrypted HTTP.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/http-strict-transport-security/",
"Remediation": {
"Code": {
"CLI": "cloudflare ssl hsts enable --zone-id <zone_id> --max-age 31536000",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> Enable HSTS",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n security_header {\n enabled = true\n max_age = 31536000\n include_subdomains = true\n preload = true\n }\n }\n}"
},
"Recommendation": {
"Text": "Enable HSTS for all Cloudflare zones with a max-age of at least 6 months (15768000 seconds) to prevent SSL stripping attacks.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/http-strict-transport-security/"
}
},
"Categories": [
"encryption",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "HSTS requires HTTPS to be properly configured. Ensure all resources are accessible via HTTPS before enabling HSTS with a long max-age."
}

View File

@@ -0,0 +1,37 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_hsts_enabled(Check):
"""Check if HSTS (HTTP Strict Transport Security) is enabled with recommended max-age"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
# Recommended minimum max-age is 6 months (15768000 seconds)
recommended_max_age = 15768000
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=ssl_settings,
resource_name=ssl_settings.zone_name,
resource_id=zone_id,
zone_name=ssl_settings.zone_name,
)
if ssl_settings.hsts_enabled:
if ssl_settings.hsts_max_age >= recommended_max_age:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has HSTS enabled with max-age of {ssl_settings.hsts_max_age} seconds (>= {recommended_max_age} recommended)."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} has HSTS enabled but max-age is {ssl_settings.hsts_max_age} seconds (< {recommended_max_age} recommended). Increase max-age for better security."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} does not have HSTS enabled. Enable HSTS to prevent SSL stripping and man-in-the-middle attacks."
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_hsts_include_subdomains",
"CheckTitle": "Ensure HSTS includes subdomains for comprehensive protection",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"Description": "This check ensures that HSTS (HTTP Strict Transport Security) is configured with the includeSubDomains directive to apply HSTS policy uniformly across the entire domain including all subdomains.",
"Risk": "Without includeSubDomains directive, subdomains may be vulnerable to SSL stripping attacks even if the main domain has HSTS enabled.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/http-strict-transport-security/",
"Remediation": {
"Code": {
"CLI": "cloudflare ssl hsts enable --zone-id <zone_id> --include-subdomains",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> HSTS -> Enable 'Include subdomains'",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n security_header {\n enabled = true\n include_subdomains = true\n max_age = 31536000\n }\n }\n}"
},
"Recommendation": {
"Text": "Enable HSTS with includeSubDomains directive for all Cloudflare zones to ensure all subdomains are protected.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/http-strict-transport-security/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Ensure all subdomains are accessible via HTTPS before enabling includeSubDomains to avoid accessibility issues."
}

View File

@@ -0,0 +1,34 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_hsts_include_subdomains(Check):
"""Check if HSTS includes subdomains for comprehensive protection"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=ssl_settings,
resource_name=ssl_settings.zone_name,
resource_id=zone_id,
zone_name=ssl_settings.zone_name,
)
if ssl_settings.hsts_enabled and ssl_settings.hsts_include_subdomains:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has HSTS enabled with includeSubDomains directive, protecting all subdomains."
elif ssl_settings.hsts_enabled and not ssl_settings.hsts_include_subdomains:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} has HSTS enabled but does not include subdomains. Enable includeSubDomains to protect all subdomains."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} does not have HSTS enabled. Enable HSTS with includeSubDomains directive."
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_mode_full_strict",
"CheckTitle": "Ensure SSL/TLS mode is set to Full (strict) for end-to-end encryption",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"Description": "This check ensures that SSL/TLS mode is set to Full (strict) for Cloudflare zones to ensure end-to-end encryption with certificate validation between Cloudflare and origin servers.",
"Risk": "Using flexible or off SSL modes can expose traffic between Cloudflare and origin servers to interception. Full (strict) mode ensures encrypted connections and validates origin server certificates.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/origin-configuration/ssl-modes/",
"Remediation": {
"Code": {
"CLI": "cloudflare ssl mode update --zone-id <zone_id> --mode full",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Overview -> Set to 'Full (strict)'",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n ssl = \"full\"\n }\n}"
},
"Recommendation": {
"Text": "Set SSL/TLS mode to Full (strict) for all Cloudflare zones to ensure end-to-end encryption with proper certificate validation.",
"Url": "https://developers.cloudflare.com/ssl/origin-configuration/ssl-modes/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Full (strict) mode requires a valid SSL certificate on the origin server. Ensure your origin has a trusted certificate before enabling."
}

View File

@@ -0,0 +1,31 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_mode_full_strict(Check):
"""Check if SSL/TLS mode is set to Full (strict) for end-to-end encryption"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=ssl_settings,
resource_name=ssl_settings.zone_name,
resource_id=zone_id,
zone_name=ssl_settings.zone_name,
)
# SSL mode should be "full" or "strict" for end-to-end encryption
if ssl_settings.ssl_mode in ["full", "strict"]:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has SSL/TLS mode set to '{ssl_settings.ssl_mode}' ensuring end-to-end encryption."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} has SSL/TLS mode set to '{ssl_settings.ssl_mode}'. Recommended: 'full' or 'strict' for end-to-end encryption with certificate validation."
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_opportunistic_encryption_enabled",
"CheckTitle": "Ensure Opportunistic Encryption is enabled for HTTP/2 benefits",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Zone",
"Description": "This check ensures that Opportunistic Encryption is enabled for Cloudflare zones to provide HTTP/2 benefits over encrypted connections, even for visitors using HTTP.",
"Risk": "Without Opportunistic Encryption, HTTP visitors cannot benefit from HTTP/2 performance improvements such as multiplexing and server push.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/opportunistic-encryption/",
"Remediation": {
"Code": {
"CLI": "cloudflare ssl opportunistic-encryption enable --zone-id <zone_id>",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> Enable Opportunistic Encryption",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n opportunistic_encryption = \"on\"\n }\n}"
},
"Recommendation": {
"Text": "Enable Opportunistic Encryption for all Cloudflare zones to provide HTTP/2 benefits to all visitors.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/opportunistic-encryption/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Opportunistic Encryption allows HTTP/2 over TLS for HTTP visitors, providing performance benefits without requiring HTTPS."
}

View File

@@ -0,0 +1,31 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_opportunistic_encryption_enabled(Check):
"""Check if Opportunistic Encryption is enabled for HTTP/2 benefits"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=ssl_settings,
resource_name=ssl_settings.zone_name,
resource_id=zone_id,
zone_name=ssl_settings.zone_name,
)
if ssl_settings.opportunistic_encryption:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has Opportunistic Encryption enabled, providing HTTP/2 benefits over encrypted connections."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} does not have Opportunistic Encryption enabled. Enable it to provide HTTP/2 benefits over encrypted connections."
findings.append(report)
return findings

View File

@@ -0,0 +1,181 @@
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.cloudflare.lib.service.service import CloudflareService
class SSL(CloudflareService):
"""Cloudflare SSL/TLS service for managing SSL settings"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.zones = self._list_zones()
self.ssl_settings = self._get_ssl_settings()
def _list_zones(self) -> dict:
"""
List all Cloudflare zones
Returns:
dict: Dictionary of zones keyed by zone ID
"""
logger.info("SSL - Listing Zones...")
zones = {}
try:
# If specific zone IDs are provided, use those
if self.provider.zone_ids:
for zone_id in self.provider.zone_ids:
zone_data = self._api_request("GET", f"/zones/{zone_id}")
if zone_data:
zones[zone_data["id"]] = Zone(
id=zone_data["id"],
name=zone_data["name"],
account_id=zone_data.get("account", {}).get("id", ""),
)
else:
# List all zones
all_zones = self._api_request_paginated("/zones")
for zone_data in all_zones:
zones[zone_data["id"]] = Zone(
id=zone_data["id"],
name=zone_data["name"],
account_id=zone_data.get("account", {}).get("id", ""),
)
logger.info(f"Found {len(zones)} zone(s) for SSL checks")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return zones
def _get_ssl_settings(self) -> dict:
"""
Get SSL/TLS settings for all zones
Returns:
dict: Dictionary of SSL settings keyed by zone ID
"""
logger.info("SSL - Getting SSL/TLS Settings...")
ssl_settings = {}
try:
for zone_id, zone in self.zones.items():
# Get SSL/TLS mode
ssl_mode = self._api_request("GET", f"/zones/{zone_id}/settings/ssl")
# Get minimum TLS version
min_tls = self._api_request(
"GET", f"/zones/{zone_id}/settings/min_tls_version"
)
# Get TLS 1.3 setting
tls_1_3 = self._api_request("GET", f"/zones/{zone_id}/settings/tls_1_3")
# Get automatic HTTPS rewrites
auto_https = self._api_request(
"GET", f"/zones/{zone_id}/settings/automatic_https_rewrites"
)
# Get always use HTTPS
always_https = self._api_request(
"GET", f"/zones/{zone_id}/settings/always_use_https"
)
# Get opportunistic encryption
opportunistic = self._api_request(
"GET", f"/zones/{zone_id}/settings/opportunistic_encryption"
)
# Get HSTS settings
hsts = self._api_request(
"GET", f"/zones/{zone_id}/settings/security_header"
)
ssl_settings[zone_id] = SSLSettings(
zone_id=zone_id,
zone_name=zone.name,
ssl_mode=ssl_mode.get("value", "") if ssl_mode else "",
min_tls_version=(min_tls.get("value", "") if min_tls else "1.0"),
tls_1_3_enabled=(
tls_1_3.get("value", "off") == "on" if tls_1_3 else False
),
automatic_https_rewrites=(
auto_https.get("value", "off") == "on" if auto_https else False
),
always_use_https=(
always_https.get("value", "off") == "on"
if always_https
else False
),
opportunistic_encryption=(
opportunistic.get("value", "off") == "on"
if opportunistic
else False
),
hsts_enabled=(
hsts.get("value", {})
.get("strict_transport_security", {})
.get("enabled", False)
if hsts
else False
),
hsts_max_age=(
hsts.get("value", {})
.get("strict_transport_security", {})
.get("max_age", 0)
if hsts
else 0
),
hsts_include_subdomains=(
hsts.get("value", {})
.get("strict_transport_security", {})
.get("include_subdomains", False)
if hsts
else False
),
hsts_preload=(
hsts.get("value", {})
.get("strict_transport_security", {})
.get("preload", False)
if hsts
else False
),
)
logger.info(f"Retrieved SSL settings for {len(ssl_settings)} zone(s)")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return ssl_settings
class Zone(BaseModel):
"""Model for Cloudflare Zone"""
id: str
name: str
account_id: str
class SSLSettings(BaseModel):
"""Model for Cloudflare SSL/TLS Settings"""
zone_id: str
zone_name: str
ssl_mode: str
min_tls_version: str
tls_1_3_enabled: bool
automatic_https_rewrites: bool
always_use_https: bool
opportunistic_encryption: bool
hsts_enabled: bool
hsts_max_age: int
hsts_include_subdomains: bool
hsts_preload: bool

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_tls_1_3_enabled",
"CheckTitle": "Ensure TLS 1.3 is enabled for enhanced security and performance",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"Description": "This check ensures that TLS 1.3 is enabled for Cloudflare zones to activate the latest TLS protocol, which streamlines the TLS handshake, enhances security, and reduces connection time.",
"Risk": "Without TLS 1.3 enabled, connections use older TLS versions which have longer handshake times and may be vulnerable to known attacks.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/tls-13/",
"Remediation": {
"Code": {
"CLI": "cloudflare ssl tls-1-3 enable --zone-id <zone_id>",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> Enable TLS 1.3",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n tls_1_3 = \"on\"\n }\n}"
},
"Recommendation": {
"Text": "Enable TLS 1.3 for all Cloudflare zones to take advantage of improved security and performance.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/tls-13/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "TLS 1.3 is supported by modern browsers and provides significant security and performance improvements."
}

View File

@@ -0,0 +1,31 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_tls_1_3_enabled(Check):
"""Check if TLS 1.3 is enabled for enhanced security and performance"""
def execute(self) -> List[CheckReportCloudflare]:
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=ssl_settings,
resource_name=ssl_settings.zone_name,
resource_id=zone_id,
zone_name=ssl_settings.zone_name,
)
if ssl_settings.tls_1_3_enabled:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has TLS 1.3 enabled, providing enhanced security and reduced connection time."
else:
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} does not have TLS 1.3 enabled. Enable TLS 1.3 for improved security and performance."
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "cloudflare",
"CheckID": "ssl_tls_minimum_version",
"CheckTitle": "Ensure minimum TLS version is set to 1.2 or higher",
"CheckType": [],
"ServiceName": "ssl",
"SubServiceName": "",
"ResourceIdTemplate": "zone_id",
"Severity": "high",
"ResourceType": "Zone",
"Description": "This check ensures that Cloudflare zones enforce a minimum TLS version of 1.2 or higher. TLS 1.0 and 1.1 are deprecated and have known security vulnerabilities.",
"Risk": "Using outdated TLS versions (1.0 and 1.1) exposes connections to known security vulnerabilities and does not meet modern security standards. This can lead to man-in-the-middle attacks and data interception.",
"RelatedUrl": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/minimum-tls/",
"Remediation": {
"Code": {
"CLI": "curl -X PATCH \"https://api.cloudflare.com/v4/zones/<zone_id>/settings/min_tls_version\" -H \"Authorization: Bearer <api_token>\" -H \"Content-Type: application/json\" -d '{\"value\":\"1.2\"}'",
"NativeIaC": "",
"Other": "https://dash.cloudflare.com/ -> Select Zone -> SSL/TLS -> Edge Certificates -> Minimum TLS Version -> Set to 1.2 or higher",
"Terraform": "resource \"cloudflare_zone_settings_override\" \"example\" {\n zone_id = var.zone_id\n settings {\n min_tls_version = \"1.2\"\n }\n}"
},
"Recommendation": {
"Text": "Set the minimum TLS version to 1.2 or 1.3 for all Cloudflare zones to ensure secure encrypted connections.",
"Url": "https://developers.cloudflare.com/ssl/edge-certificates/additional-options/minimum-tls/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "TLS 1.0 and 1.1 were deprecated by major browsers in 2020. TLS 1.2 is the current recommended minimum version."
}

View File

@@ -0,0 +1,41 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.ssl.ssl_client import ssl_client
class ssl_tls_minimum_version(Check):
"""Check if Cloudflare zones have minimum TLS version set to 1.2 or higher
This class verifies that each Cloudflare zone enforces a minimum TLS version
of 1.2 or higher to ensure secure connections.
"""
def execute(self) -> List[CheckReportCloudflare]:
"""Execute the Cloudflare minimum TLS version check
Iterates over all SSL settings and checks the minimum TLS version.
Returns:
List[CheckReportCloudflare]: A list of reports for each zone
"""
findings = []
for zone_id, ssl_settings in ssl_client.ssl_settings.items():
zone = ssl_client.zones.get(zone_id)
if not zone:
continue
report = CheckReportCloudflare(
metadata=self.metadata(), resource=ssl_settings
)
report.status = "FAIL"
report.status_extended = f"Zone {ssl_settings.zone_name} has minimum TLS version set to {ssl_settings.min_tls_version}, which is below the recommended 1.2."
# Check if minimum TLS version is 1.2 or higher
if ssl_settings.min_tls_version in ["1.2", "1.3"]:
report.status = "PASS"
report.status_extended = f"Zone {ssl_settings.zone_name} has minimum TLS version set to {ssl_settings.min_tls_version}."
findings.append(report)
return findings

View File

@@ -302,6 +302,17 @@ class Provider(ABC):
fixer_config=fixer_config,
use_instance_principal=arguments.use_instance_principal,
)
elif "cloudflare" in provider_class_name.lower():
provider_class(
api_token=arguments.api_token,
api_key=arguments.api_key,
api_email=arguments.api_email,
account_ids=arguments.account_id,
zone_ids=arguments.zone_id,
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
except TypeError as error:
logger.critical(