feat(cloudflare): Add TLS/SSL, records and email security checks for zones (#9424)

Co-authored-by: Andoni Alonso <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Hugo Pereira Brito
2026-01-15 09:31:27 +01:00
committed by GitHub
parent 1bf49747ad
commit d4bc6d7531
52 changed files with 3613 additions and 20 deletions

15
.github/labeler.yml vendored
View File

@@ -46,12 +46,17 @@ provider/oci:
- changed-files:
- any-glob-to-any-file: "prowler/providers/oraclecloud/**"
- any-glob-to-any-file: "tests/providers/oraclecloud/**"
provider/alibabacloud:
- changed-files:
- any-glob-to-any-file: "prowler/providers/alibabacloud/**"
- any-glob-to-any-file: "tests/providers/alibabacloud/**"
provider/cloudflare:
- changed-files:
- any-glob-to-any-file: "prowler/providers/cloudflare/**"
- any-glob-to-any-file: "tests/providers/cloudflare/**"
github_actions:
- changed-files:
- any-glob-to-any-file: ".github/workflows/*"
@@ -67,15 +72,21 @@ mutelist:
- any-glob-to-any-file: "prowler/providers/azure/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/gcp/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/kubernetes/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/m365/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/mongodbatlas/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/oraclecloud/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/alibabacloud/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/cloudflare/lib/mutelist/**"
- any-glob-to-any-file: "tests/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/aws/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/azure/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/gcp/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/kubernetes/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/m365/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/mongodbatlas/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/oci/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/oraclecloud/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/alibabacloud/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/cloudflare/lib/mutelist/**"
integration/s3:
- changed-files:

View File

@@ -32,6 +32,7 @@ The supported providers right now are:
| [M365](/user-guide/providers/microsoft365/getting-started-m365) | Official | Tenants | UI, API, CLI |
| [Github](/user-guide/providers/github/getting-started-github) | Official | Organizations / Repositories | UI, API, CLI |
| [Oracle Cloud](/user-guide/providers/oci/getting-started-oci) | Official | Tenancies / Compartments | UI, API, CLI |
| [Cloudflare](/user-guide/providers/cloudflare/getting-started-cloudflare) | Official | Accounts | CLI |
| [Infra as Code](/user-guide/providers/iac/getting-started-iac) | Official | Repositories | UI, API, CLI |
| [MongoDB Atlas](/user-guide/providers/mongodbatlas/getting-started-mongodbatlas) | Official | Organizations | UI, API, CLI |
| [LLM](/user-guide/providers/llm/getting-started-llm) | Official | Models | CLI |

View File

@@ -2,6 +2,10 @@
title: 'Getting Started with Cloudflare'
---
import { VersionBadge } from "/snippets/version-badge.mdx";
<VersionBadge version="5.17.0" />
Prowler for Cloudflare allows you to scan your Cloudflare zones for security misconfigurations, including SSL/TLS settings, DNSSEC, HSTS, and more.
## Prerequisites

View File

@@ -18,6 +18,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `Cloudflare` provider with critical security checks [(#9423)](https://github.com/prowler-cloud/prowler/pull/9423)
- `compute_instance_single_network_interface` check for GCP provider [(#9702)](https://github.com/prowler-cloud/prowler/pull/9702)
- `compute_image_not_publicly_shared` check for GCP provider [(#9718)](https://github.com/prowler-cloud/prowler/pull/9718)
- `TLS/SSL`, `records` and `email` checks for `zone` service [(#9424)](https://github.com/prowler-cloud/prowler/pull/9424)
- `compute_snapshot_not_outdated` check for GCP provider [(#9774)](https://github.com/prowler-cloud/prowler/pull/9774)
- CIS 1.12 compliance framework for Kubernetes [(#9778)](https://github.com/prowler-cloud/prowler/pull/9778)
- CIS 6.0 for M365 provider [(#9779)](https://github.com/prowler-cloud/prowler/pull/9779)

View File

@@ -35,12 +35,12 @@ class CloudflareProvider(Provider):
_audit_config: dict
_fixer_config: dict
_mutelist: CloudflareMutelist
_filter_zone: set[str] | None
_filter_zones: set[str] | None
audit_metadata: Audit_Metadata
def __init__(
self,
filter_zone: Iterable[str] | None = None,
filter_zones: Iterable[str] | None = None,
config_path: str = None,
config_content: dict | None = None,
fixer_config: dict = {},
@@ -72,7 +72,7 @@ class CloudflareProvider(Provider):
self._mutelist = CloudflareMutelist(mutelist_path=mutelist_path)
# Store zone filter for filtering resources across services
self._filter_zone = set(filter_zone) if filter_zone else None
self._filter_zones = set(filter_zones) if filter_zones else None
Provider.set_global_provider(self)
@@ -101,9 +101,9 @@ class CloudflareProvider(Provider):
return self._mutelist
@property
def filter_zone(self) -> set[str] | None:
def filter_zones(self) -> set[str] | None:
"""Zone filter from --region argument to filter resources."""
return self._filter_zone
return self._filter_zones
@property
def accounts(self) -> list[CloudflareAccount]:

View File

@@ -0,0 +1,4 @@
from prowler.providers.cloudflare.services.dns.dns_service import DNS
from prowler.providers.common.provider import Provider
dns_client = DNS(Provider.get_global_provider())

View File

@@ -0,0 +1,64 @@
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.cloudflare.lib.service.service import CloudflareService
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class DNS(CloudflareService):
"""Retrieve Cloudflare DNS records for all zones."""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.records: list["CloudflareDNSRecord"] = []
self._list_dns_records()
def _list_dns_records(self) -> None:
"""List DNS records for all zones."""
logger.info("DNS - Listing DNS records...")
try:
for zone in zone_client.zones.values():
seen_record_ids: set[str] = set()
try:
for record in self.client.dns.records.list(zone_id=zone.id):
record_id = getattr(record, "id", None)
# Prevent infinite loop
if record_id in seen_record_ids:
break
seen_record_ids.add(record_id)
self.records.append(
CloudflareDNSRecord(
id=record_id,
zone_id=zone.id,
zone_name=zone.name,
name=getattr(record, "name", None),
type=getattr(record, "type", None),
content=getattr(record, "content", ""),
ttl=getattr(record, "ttl", None),
proxied=getattr(record, "proxied", False),
)
)
except Exception as error:
logger.error(
f"{zone.id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_automatic_https_rewrites_enabled",
"CheckTitle": "Automatic HTTPS Rewrites is enabled",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **Automatic HTTPS Rewrites** configuration by checking if it is enabled to automatically rewrite insecure HTTP links to HTTPS, resolving **mixed content issues** and enhancing site security.",
"Risk": "Without **Automatic HTTPS Rewrites**, pages may contain mixed content where HTTP resources load over HTTPS pages.\n- **Confidentiality**: insecure resources can be intercepted and modified by attackers\n- **Integrity**: browsers block or warn about mixed content, indicating potential tampering\n- **User Experience**: security warnings degrade trust and some browsers block mixed content entirely",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/ssl/edge-certificates/additional-options/automatic-https-rewrites/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to SSL/TLS > Edge Certificates\n3. Scroll to Automatic HTTPS Rewrites\n4. Toggle the setting to On\n5. Verify that your site loads correctly without mixed content warnings",
"Terraform": "```hcl\n# Enable Automatic HTTPS Rewrites to fix mixed content\nresource \"cloudflare_zone_settings_override\" \"https_rewrites\" {\n zone_id = \"<ZONE_ID>\"\n settings {\n automatic_https_rewrites = \"on\" # Critical: automatically rewrites HTTP URLs to HTTPS\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable **Automatic HTTPS Rewrites** as part of a comprehensive HTTPS strategy.\n- Automatically fixes mixed content by rewriting HTTP URLs to HTTPS\n- Combine with **Always Use HTTPS** and **HSTS** for defense in depth\n- Works best when all resources are available over HTTPS\n- Monitor for resources that cannot be rewritten and fix them at the source",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_automatic_https_rewrites_enabled"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This feature works best when combined with Always Use HTTPS to ensure the entire site is served over HTTPS. Some resources may not be rewritable if they don't support HTTPS."
}

View File

@@ -0,0 +1,45 @@
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_automatic_https_rewrites_enabled(Check):
"""Ensure that Automatic HTTPS Rewrites is enabled for Cloudflare zones.
Automatic HTTPS Rewrites automatically rewrites insecure HTTP links to HTTPS,
resolving mixed content issues and enhancing site security. This feature scans
HTML responses and rewrites HTTP URLs to HTTPS for resources that are known to
be available over a secure connection, preventing mixed content warnings.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the Automatic HTTPS Rewrites enabled check.
Iterates through all Cloudflare zones and verifies that Automatic HTTPS
Rewrites is enabled. This setting automatically fixes mixed content issues
by rewriting HTTP links to HTTPS where possible.
Returns:
A list of CheckReportCloudflare objects with PASS status if Automatic
HTTPS Rewrites is enabled, or FAIL status if it is disabled for the zone.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
automatic_https_rewrites = (
zone.settings.automatic_https_rewrites or ""
).lower()
if automatic_https_rewrites == "on":
report.status = "PASS"
report.status_extended = (
f"Automatic HTTPS Rewrites is enabled for zone {zone.name}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Automatic HTTPS Rewrites is not enabled for zone {zone.name}."
)
findings.append(report)
return findings

View File

@@ -8,6 +8,7 @@
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **DNSSEC** configuration by checking if it is enabled to **cryptographically sign DNS responses** and protect against DNS spoofing and cache poisoning attacks.",
"Risk": "Without **DNSSEC**, DNS responses can be spoofed or modified by attackers.\n- **Confidentiality**: users can be redirected to malicious sites that harvest credentials\n- **Integrity**: DNS hijacking enables man-in-the-middle attacks and content modification\n- **Availability**: cache poisoning can cause denial of service by directing traffic to non-existent servers",
"RelatedUrl": "",

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_email_obfuscation_enabled",
"CheckTitle": "Email Obfuscation is enabled",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **Email Obfuscation** (Scrape Shield) configuration by checking if it is enabled to protect email addresses on the website from **automated harvesting** by bots and spammers.",
"Risk": "Without **Email Obfuscation**, email addresses displayed on your website can be harvested by bots.\n- **Confidentiality**: harvested emails become targets for spam and phishing campaigns\n- **Integrity**: employees may fall victim to targeted social engineering attacks\n- **Availability**: increased spam volume can overwhelm email systems",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/waf/tools/scrape-shield/email-address-obfuscation/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to Scrape Shield (or Security > Settings in newer UI)\n3. Scroll to Email Address Obfuscation\n4. Toggle the setting to On\n5. Verify that email addresses still work correctly for human visitors",
"Terraform": "```hcl\n# Enable Email Obfuscation to protect against email harvesting\nresource \"cloudflare_zone_settings_override\" \"email_obfuscation\" {\n zone_id = \"<ZONE_ID>\"\n settings {\n email_obfuscation = \"on\" # Critical: hides email addresses from automated scrapers\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable **Email Obfuscation** as part of anti-scraping protections.\n- Automatically encodes email addresses to prevent bot harvesting\n- Email addresses remain visible and clickable for human visitors\n- Works with mailto: links and plain text email addresses\n- Part of the Scrape Shield feature set for comprehensive protection",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_email_obfuscation_enabled"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Email Obfuscation automatically hides email addresses from bots while keeping them visible and clickable for human visitors. May not work with JavaScript-rendered email addresses."
}

View File

@@ -0,0 +1,43 @@
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_email_obfuscation_enabled(Check):
"""Ensure that Email Obfuscation is enabled for Cloudflare zones.
Email Obfuscation is part of Cloudflare's Scrape Shield suite that protects
email addresses displayed on websites from automated harvesting by bots and
spammers. It encrypts email addresses in the HTML source while keeping them
visible to human visitors, reducing spam and protecting user privacy.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the Email Obfuscation enabled check.
Iterates through all Cloudflare zones and verifies that Email Obfuscation
is enabled. This feature helps prevent email harvesting by obfuscating
email addresses in the page source.
Returns:
A list of CheckReportCloudflare objects with PASS status if Email
Obfuscation is enabled, or FAIL status if it is disabled for the zone.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
email_obfuscation = (zone.settings.email_obfuscation or "").lower()
if email_obfuscation == "on":
report.status = "PASS"
report.status_extended = (
f"Email Obfuscation is enabled for zone {zone.name}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Email Obfuscation is not enabled for zone {zone.name}."
)
findings.append(report)
return findings

View File

@@ -8,6 +8,7 @@
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **HTTP Strict Transport Security (HSTS)** by checking if it is enabled with a `max-age` of at least **6 months** (15768000 seconds) and **includes subdomains** to instruct browsers to always use HTTPS across the entire domain.",
"Risk": "Without **HSTS**, browsers may initially connect over HTTP before redirecting to HTTPS.\n- **Confidentiality**: creates a window for SSL stripping attacks where attackers downgrade connections to unencrypted HTTP\n- **Integrity**: first request can be intercepted and modified before HTTPS redirect\n- **Session hijacking**: cookies and credentials may be captured during initial HTTP request",
"RelatedUrl": "",

View File

@@ -8,12 +8,12 @@
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **Always Use HTTPS** setting by checking if it is enabled to automatically redirect all **HTTP requests to HTTPS**, enforcing encrypted transport for all visitors.",
"Risk": "Without **automatic HTTPS redirects**, users may access resources over unencrypted HTTP.\n- **Confidentiality**: traffic can be intercepted and read by attackers on the network path\n- **Integrity**: HTTP responses can be modified in transit (content injection, malware insertion)\n- **Authentication**: session cookies and credentials may be transmitted in plaintext",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/ssl/edge-certificates/additional-options/always-use-https/",
"https://developers.cloudflare.com/terraform/tutorial/configure-https-settings/"
"https://developers.cloudflare.com/ssl/edge-certificates/additional-options/always-use-https/"
],
"Remediation": {
"Code": {

View File

@@ -8,6 +8,7 @@
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **minimum TLS version** configuration by checking if the version is set to at least `TLS 1.2` to ensure connections use **secure, modern cryptographic protocols**.",
"Risk": "Allowing **legacy TLS versions** (1.0, 1.1) exposes connections to known protocol vulnerabilities.\n- **Confidentiality**: BEAST, POODLE, and weak cipher suites can be exploited for traffic decryption\n- **Compliance**: TLS 1.0/1.1 are deprecated by PCI-DSS, NIST, and major browsers\n- **Integrity**: downgrade attacks can force weaker encryption that is susceptible to tampering",
"RelatedUrl": "",

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_record_caa_exists",
"CheckTitle": "CAA record exists with issue or issuewild tag",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **CAA (Certificate Authority Authorization)** DNS records by checking if they exist with **`issue` or `issuewild` tags** that specify which **certificate authorities** are permitted to issue SSL/TLS certificates for the domain.",
"Risk": "Without **CAA** records or without `issue`/`issuewild` tags, any certificate authority can issue certificates for your domain.\n- **Confidentiality**: unauthorized certificates enable man-in-the-middle attacks\n- **Integrity**: attackers can impersonate your domain with fraudulently obtained certificates\n- **Trust**: CA compromise or social engineering can result in unauthorized certificate issuance\n- **Missing tags**: CAA records without `issue` or `issuewild` tags (e.g., only `iodef`) do not restrict certificate issuance",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/ssl/edge-certificates/caa-records/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to DNS > Records\n3. Click Add record\n4. Select CAA as the record type\n5. Enter @ for the Name field\n6. Set flags to 0, tag to issue, and value to your authorized CA (e.g., letsencrypt.org)\n7. Click Save\n8. Optionally add issuewild records to control wildcard certificate issuance",
"Terraform": "```hcl\n# Create CAA record to restrict certificate issuance\nresource \"cloudflare_record\" \"caa_issue\" {\n zone_id = \"<ZONE_ID>\"\n name = \"@\"\n type = \"CAA\"\n data {\n flags = \"0\"\n tag = \"issue\" # Restricts which CAs can issue regular certificates\n value = \"letsencrypt.org\" # Specify your authorized certificate authority\n }\n}\n\n# Create CAA record to restrict wildcard certificate issuance\nresource \"cloudflare_record\" \"caa_issuewild\" {\n zone_id = \"<ZONE_ID>\"\n name = \"@\"\n type = \"CAA\"\n data {\n flags = \"0\"\n tag = \"issuewild\" # Restricts which CAs can issue wildcard certificates\n value = \"letsencrypt.org\"\n }\n}\n```"
},
"Recommendation": {
"Text": "Add **CAA records** with `issue` or `issuewild` tags specifying authorized certificate authorities.\n- `issue` tag: authorizes CAs for regular (non-wildcard) certificates\n- `issuewild` tag: authorizes CAs for wildcard certificates specifically\n- Use `issue \";\"` to block all certificate issuance (for domains that should never have certificates)\n- Use `iodef` tag to receive reports of policy violations (optional, for monitoring)\n- All CAs are required to check CAA records before issuing certificates",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_record_caa_exists"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "CAA records help prevent unauthorized SSL/TLS certificate issuance. This check verifies both existence and that the record contains issue or issuewild tags. Records with only iodef tag will fail this check."
}

View File

@@ -0,0 +1,82 @@
import re
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.dns.dns_client import dns_client
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_record_caa_exists(Check):
"""Ensure that CAA record exists with certificate issuance restrictions.
CAA (Certificate Authority Authorization) is a DNS record type that allows
domain owners to specify which certificate authorities (CAs) are permitted
to issue SSL/TLS certificates for their domain. This check verifies that CAA
records exist with "issue" or "issuewild" tags that explicitly authorize
specific CAs, preventing unauthorized certificate issuance and reducing the
risk of man-in-the-middle attacks from fraudulent certificates.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the CAA record exists check.
Iterates through all Cloudflare zones and verifies CAA configuration.
The check validates two conditions:
1. CAA records exist for the zone
2. At least one record has an "issue" or "issuewild" tag specifying authorized CAs
Returns:
A list of CheckReportCloudflare objects with PASS status if CAA
records with issuance restrictions exist, or FAIL status if no CAA
records are found or they lack proper issue/issuewild tags.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
# CAA records restrict which CAs can issue certificates
caa_records = [
record
for record in dns_client.records
if record.zone_id == zone.id and record.type == "CAA"
]
if not caa_records:
report.status = "FAIL"
report.status_extended = f"No CAA record found for zone {zone.name}."
else:
# Check if CAA records have issue or issuewild tags with CA specified
issue_records = [
record
for record in caa_records
if self._has_issue_tag_with_ca(record.content)
]
records_str = ", ".join(record.name for record in caa_records)
if issue_records:
report.status = "PASS"
report.status_extended = f"CAA record with certificate issuance restrictions exists for zone {zone.name}: {records_str}."
else:
report.status = "FAIL"
report.status_extended = f"CAA record exists for zone {zone.name} but does not specify authorized CAs with issue or issuewild tags: {records_str}."
findings.append(report)
return findings
def _has_issue_tag_with_ca(self, content: str) -> bool:
"""Check if CAA record has issue or issuewild tag with a CA specified.
CAA content format: "flags tag value" e.g., "0 issue letsencrypt.org"
"""
# Strip quotes that may be present from Cloudflare API
content_lower = content.strip('"').lower()
# Match issue or issuewild tag followed by a value (CA name or ";" to block all)
# Format: "0 issue letsencrypt.org" or "0 issuewild ;" or "0 issue \"digicert.com\""
issue_match = re.search(r"\bissue\b", content_lower)
issuewild_match = re.search(r"\bissuewild\b", content_lower)
return bool(issue_match or issuewild_match)

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_record_dkim_exists",
"CheckTitle": "DKIM record exists with valid public key",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **DKIM (DomainKeys Identified Mail)** records by checking if TXT records exist at `*._domainkey` subdomains containing a **cryptographically valid public key** in the `p=` parameter used to **verify email signatures**.",
"Risk": "Without **DKIM** or with a revoked/empty public key, email recipients cannot verify that messages were sent by authorized servers.\n- **Confidentiality**: attackers can forge emails appearing to come from your domain\n- **Integrity**: no cryptographic proof that email content hasn't been modified in transit\n- **Reputation**: DMARC policies relying on DKIM will fail, affecting email deliverability\n- **Revoked keys**: A DKIM record with `p=` empty (e.g., `p=;`) indicates a revoked key that cannot authenticate emails",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/dns/manage-dns-records/how-to/email-records/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Generate DKIM keys using your email provider or mail server\n2. Log in to the Cloudflare dashboard and select your account and domain\n3. Go to DNS > Records\n4. Click Add record\n5. Select TXT as the record type\n6. Enter selector._domainkey for the Name field (e.g., google._domainkey)\n7. Enter the DKIM public key record provided by your email service (must include p= with actual key)\n8. Click Save",
"Terraform": "```hcl\n# Create DKIM record for email authentication\nresource \"cloudflare_record\" \"dkim\" {\n zone_id = \"<ZONE_ID>\"\n name = \"google._domainkey\" # Selector varies by email provider\n type = \"TXT\"\n content = \"v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4...\" # Must contain valid public key\n ttl = 3600\n}\n```"
},
"Recommendation": {
"Text": "Configure **DKIM records** with valid public keys from your email provider.\n- Each email service requires its own DKIM selector (e.g., google._domainkey for Google Workspace)\n- The `p=` parameter must contain the actual public key, not be empty\n- An empty `p=` value indicates a revoked key and will fail this check\n- DKIM works alongside SPF and DMARC for comprehensive email authentication\n- Rotate DKIM keys periodically for enhanced security\n- Use 2048-bit keys for stronger cryptographic protection",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_record_dkim_exists"
}
},
"Categories": [
"email-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "DKIM records are TXT records at *._domainkey subdomains starting with 'v=DKIM1'. This check uses cryptographic validation to verify that the p= parameter contains a real DER-encoded public key, not just non-empty Base64. Revoked keys, invalid Base64, or malformed keys will fail this check."
}

View File

@@ -0,0 +1,116 @@
import base64
import re
from cryptography.hazmat.primitives.serialization import load_der_public_key
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.dns.dns_client import dns_client
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_record_dkim_exists(Check):
"""Ensure that DKIM record exists with valid public key for Cloudflare zones.
DKIM (DomainKeys Identified Mail) is an email authentication method that allows
the receiver to verify that an email was sent by the domain owner and was not
modified in transit. This check verifies that DKIM records exist at *._domainkey
subdomains containing "v=DKIM1" with a cryptographically valid public key in the
p= parameter that can be used to verify email signatures.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the DKIM record exists check.
Iterates through all Cloudflare zones and verifies DKIM configuration.
The check validates three conditions:
1. A DKIM record exists (TXT record at *._domainkey with "v=DKIM1")
2. The record contains a p= parameter with a public key
3. The public key is cryptographically valid (valid Base64 and DER format)
Returns:
A list of CheckReportCloudflare objects with PASS status if a DKIM
record with valid public key exists, or FAIL status if no DKIM record
is found or the public key is invalid/missing.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
# DKIM records are TXT records at *._domainkey subdomain containing "v=DKIM1"
dkim_records = [
record
for record in dns_client.records
if record.zone_id == zone.id
and record.type == "TXT"
and record.name
and "_domainkey" in record.name
and "V=DKIM1"
in record.content.replace('" "', "").replace('"', "").upper()
]
if not dkim_records:
report.status = "FAIL"
report.status_extended = f"No DKIM record found for zone {zone.name}."
else:
# Check if DKIM records have a valid public key
valid_key_records = [
record
for record in dkim_records
if self._has_valid_public_key(record.content)
]
records_str = ", ".join(record.name for record in dkim_records)
if valid_key_records:
report.status = "PASS"
report.status_extended = f"DKIM record with valid public key exists for zone {zone.name}: {records_str}."
else:
report.status = "FAIL"
report.status_extended = f"DKIM record exists for zone {zone.name} but has invalid or missing public key: {records_str}."
findings.append(report)
return findings
def _has_valid_public_key(self, content: str) -> bool:
"""Check if DKIM record has a valid public key.
Validates that:
1. The p= parameter exists and is not empty
2. The key is valid Base64
3. The key can be loaded as a valid DER-encoded public key
"""
# Cloudflare API may return TXT records with quotes, and long records
# may be split into multiple quoted strings like: "part1" "part2"
# First remove '" "' to join split parts, then remove remaining quotes
content = content.replace('" "', "").replace('"', "")
# Extract the public key value from p= parameter
match = re.search(r"p\s*=\s*([^;\s]*)", content, re.IGNORECASE)
if not match:
return False
key_value = match.group(1)
# Empty key means revoked
if not key_value:
return False
try:
# Add padding if necessary for Base64 decoding
padding = 4 - (len(key_value) % 4)
if padding != 4:
key_value += "=" * padding
# Decode Base64 to get DER-encoded key
der_key = base64.b64decode(key_value, validate=True)
# Try to load as a public key using cryptography library
load_der_public_key(der_key)
return True
except Exception:
return False

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_record_dmarc_exists",
"CheckTitle": "DMARC record exists with enforcement policy (p=reject or p=quarantine)",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **DMARC (Domain-based Message Authentication, Reporting, and Conformance)** records by checking if a TXT record exists at `_dmarc` subdomain with an **enforcement policy (`p=reject` or `p=quarantine`)** to actively block or quarantine spoofed emails.",
"Risk": "Without **DMARC** or with a monitoring-only policy (`p=none`), there is no active protection against email spoofing.\n- **Confidentiality**: attackers can spoof emails for phishing campaigns to steal credentials\n- **Integrity**: no visibility into email authentication failures or abuse attempts\n- **Reputation**: domain reputation damage from spoofed emails sent in your name\n- **Monitoring-only policy**: `p=none` only generates reports but does not block or quarantine spoofed emails, providing no real protection",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/dns/manage-dns-records/how-to/email-records/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to DNS > Records\n3. Click Add record\n4. Select TXT as the record type\n5. Enter _dmarc for the Name field\n6. Enter your DMARC policy with enforcement (e.g., v=DMARC1; p=reject; rua=mailto:dmarc@example.com)\n7. Click Save",
"Terraform": "```hcl\n# Create DMARC record with enforcement policy\nresource \"cloudflare_record\" \"dmarc\" {\n zone_id = \"<ZONE_ID>\"\n name = \"_dmarc\"\n type = \"TXT\"\n content = \"v=DMARC1; p=reject; rua=mailto:dmarc@example.com\" # Use p=reject or p=quarantine for enforcement\n ttl = 3600\n}\n```"
},
"Recommendation": {
"Text": "Implement **DMARC** with an enforcement policy (`p=reject` or `p=quarantine`).\n- `p=reject`: receiving servers should reject emails that fail authentication\n- `p=quarantine`: receiving servers should send suspicious emails to spam folder\n- `p=none`: only monitoring, provides no protection (will fail this check)\n- Ensure SPF and DKIM are properly configured before enabling enforcement\n- Configure `rua` to receive aggregate reports on authentication results\n- Use `pct` parameter to gradually roll out enforcement (e.g., `pct=10` for 10% of emails)",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_record_dmarc_exists"
}
},
"Categories": [
"email-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "DMARC records are TXT records at _dmarc subdomain starting with 'v=DMARC1'. This check verifies both existence and enforcement policy (p=reject or p=quarantine). Monitoring-only policy (p=none) will fail this check."
}

View File

@@ -0,0 +1,88 @@
import re
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.dns.dns_client import dns_client
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_record_dmarc_exists(Check):
"""Ensure that DMARC record exists with enforcement policy for Cloudflare zones.
DMARC (Domain-based Message Authentication, Reporting, and Conformance) is an
email authentication protocol that builds on SPF and DKIM. It allows domain
owners to specify how receiving mail servers should handle emails that fail
authentication checks. This check verifies that a DMARC record exists at the
_dmarc subdomain with an enforcement policy (p=reject or p=quarantine) to
actively block or quarantine spoofed emails, not just monitor them (p=none).
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the DMARC record exists check.
Iterates through all Cloudflare zones and verifies DMARC configuration.
The check validates two conditions:
1. A DMARC record exists (TXT record at _dmarc subdomain with "v=DMARC1")
2. The record uses an enforcement policy (p=reject or p=quarantine)
Returns:
A list of CheckReportCloudflare objects with PASS status if a DMARC
record with enforcement policy exists, or FAIL status if no DMARC
record is found or it uses monitoring-only policy (p=none).
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
# DMARC records are TXT records at _dmarc subdomain starting with "v=DMARC1"
dmarc_records = [
record
for record in dns_client.records
if record.zone_id == zone.id
and record.type == "TXT"
and record.name
and record.name.startswith("_dmarc")
and "V=DMARC1" in record.content.strip('"').upper()
]
if not dmarc_records:
report.status = "FAIL"
report.status_extended = f"No DMARC record found for zone {zone.name}."
else:
# Check if DMARC uses enforcement policy (p=reject or p=quarantine) vs monitoring (p=none)
enforcement_records = [
record
for record in dmarc_records
if self._get_policy_value(record.content)
in ("reject", "quarantine")
]
records_str = ", ".join(record.name for record in dmarc_records)
if enforcement_records:
# Get the actual policy value from the first enforcement record
policy = self._get_policy_value(enforcement_records[0].content)
report.status = "PASS"
report.status_extended = f"DMARC record with enforcement policy p={policy} exists for zone {zone.name}: {records_str}."
else:
# Get the actual policy value to show in the message
policy = self._get_policy_value(dmarc_records[0].content) or "none"
report.status = "FAIL"
report.status_extended = f"DMARC record exists for zone {zone.name} but uses monitoring-only policy p={policy}: {records_str}."
findings.append(report)
return findings
def _get_policy_value(self, content: str) -> str | None:
"""Extract the DMARC policy value (reject, quarantine, or none)."""
# Strip quotes that may be present from Cloudflare API
content_clean = content.strip('"')
# Match p=<value> (with optional spaces around =)
match = re.search(r"p\s*=\s*(\w+)", content_clean, re.IGNORECASE)
if match:
return match.group(1).lower()
return None

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_record_spf_exists",
"CheckTitle": "SPF record exists with strict policy (-all)",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **SPF (Sender Policy Framework)** records by checking if a TXT record exists that specifies which mail servers are **authorized to send email** on behalf of the domain, and verifies that the record uses a **strict policy (`-all`)** to reject unauthorized senders.",
"Risk": "Without **SPF** or with a permissive policy (`~all`, `?all`, `+all`), attackers can forge emails appearing to come from your domain.\n- **Confidentiality**: phishing attacks can harvest sensitive information from recipients who trust spoofed emails\n- **Integrity**: brand reputation damage from fraudulent emails sent in your name\n- **Availability**: email deliverability issues as receiving servers may reject or quarantine legitimate emails\n- **Permissive policies**: `~all` (softfail) only marks emails as suspicious but does not reject them, `?all` (neutral) provides no protection",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/dns/manage-dns-records/how-to/email-records/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to DNS > Records\n3. Click Add record\n4. Select TXT as the record type\n5. Enter @ for the Name field\n6. Enter your SPF record with strict policy (e.g., v=spf1 include:_spf.google.com -all)\n7. Click Save",
"Terraform": "```hcl\n# Create SPF record for email authentication with strict policy\nresource \"cloudflare_record\" \"spf\" {\n zone_id = \"<ZONE_ID>\"\n name = \"@\"\n type = \"TXT\"\n content = \"v=spf1 include:_spf.google.com -all\" # Use -all for strict enforcement\n ttl = 3600\n}\n```"
},
"Recommendation": {
"Text": "Configure **SPF records** with strict policy (`-all`) listing authorized mail servers.\n- SPF records start with `v=spf1` and define authorized senders\n- Use `-all` (hardfail) to reject emails from unauthorized servers\n- Avoid `~all` (softfail) in production as it only marks suspicious emails but does not reject them\n- Use `include:` to authorize third-party mail services\n- Combine with **DKIM** and **DMARC** for comprehensive email authentication\n- Test SPF records using online validators before deployment",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_record_spf_exists"
}
},
"Categories": [
"email-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "SPF records start with 'v=spf1' and define authorized mail servers. This check verifies both existence and strict policy (-all). Permissive policies like ~all (softfail) or ?all (neutral) will fail this check."
}

View File

@@ -0,0 +1,68 @@
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.dns.dns_client import dns_client
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_record_spf_exists(Check):
"""Ensure that SPF record exists with strict policy for Cloudflare zones.
SPF (Sender Policy Framework) is an email authentication method that specifies
which mail servers are authorized to send email on behalf of the domain. This
check verifies that an SPF record exists as a TXT record starting with "v=spf1"
and uses the strict policy qualifier "-all" to instruct receiving servers to
reject emails from unauthorized sources.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the SPF record exists check.
Iterates through all Cloudflare zones and verifies SPF configuration.
The check validates two conditions:
1. An SPF record exists (TXT record starting with "v=spf1")
2. The record uses strict policy "-all" (not ~all, ?all, or +all)
Returns:
A list of CheckReportCloudflare objects with PASS status if an SPF
record with strict policy exists, or FAIL status if no SPF record
is found or it uses a permissive policy.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
# SPF records are TXT records starting with "v=spf1"
spf_records = [
record
for record in dns_client.records
if record.zone_id == zone.id
and record.type == "TXT"
and record.content.strip('"').startswith("v=spf1")
]
if not spf_records:
report.status = "FAIL"
report.status_extended = f"No SPF record found for zone {zone.name}."
else:
# Check if SPF uses strict policy (-all) vs permissive (~all, ?all, +all)
strict_records = [
record
for record in spf_records
if record.content.strip('"').rstrip().endswith("-all")
]
records_str = ", ".join(record.name for record in spf_records)
if strict_records:
report.status = "PASS"
report.status_extended = f"SPF record with strict policy -all exists for zone {zone.name}: {records_str}."
else:
report.status = "FAIL"
report.status_extended = f"SPF record exists for zone {zone.name} but does not use strict policy -all: {records_str}."
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Provider": "cloudflare",
"CheckID": "zone_security_under_attack_disabled",
"CheckTitle": "Under Attack Mode is disabled during normal operations",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **Under Attack Mode** configuration by checking if it is disabled during normal operations, as this mode performs additional security checks including an **interstitial JavaScript challenge page** that significantly impacts user experience.",
"Risk": "Keeping **Under Attack Mode** permanently enabled causes operational issues.\n- **Availability**: all visitors face a 5-second interstitial challenge page before accessing the site\n- **Accessibility**: visitors without JavaScript support cannot access the site at all\n- **User Experience**: legitimate users experience unnecessary delays and third-party analytics show degraded performance",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/fundamentals/reference/under-attack-mode/",
"https://developers.cloudflare.com/waf/tools/security-level/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to Security > Settings\n3. Under 'Under Attack Mode', toggle it OFF\n4. Consider setting Security Level to 'High' for continued protection without the interstitial",
"Terraform": "```hcl\n# Set security level to high instead of under_attack\nresource \"cloudflare_zone_settings_override\" \"security_settings\" {\n zone_id = \"<ZONE_ID>\"\n settings {\n security_level = \"high\" # Use high instead of under_attack for normal operations\n }\n}\n```"
},
"Recommendation": {
"Text": "Disable **Under Attack Mode** when not actively under a DDoS attack.\n- Use it only as a **last resort** during active layer 7 DDoS attacks\n- For ongoing protection, use **Security Level** settings (Low, Medium, High)\n- Configure specific **WAF rules** for targeted protection without impacting all users",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_security_under_attack_disabled"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Under Attack Mode is designed as a temporary measure during active attacks. The interstitial challenge page validates visitors using JavaScript, blocking automated attacks while allowing legitimate users through after a brief delay."
}

View File

@@ -0,0 +1,47 @@
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_security_under_attack_disabled(Check):
"""Ensure that Under Attack Mode is disabled during normal operations.
Under Attack Mode is a DDoS mitigation feature that performs additional
security checks including an interstitial JavaScript challenge page for all
visitors. While effective during active attacks, it significantly impacts
user experience and should only be enabled temporarily during actual DDoS
incidents, not as a permanent security measure.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the Under Attack Mode disabled check.
Iterates through all Cloudflare zones and verifies that the security
level is not set to "under_attack". Having this mode permanently enabled
indicates either an ongoing attack or misconfiguration that degrades
user experience unnecessarily.
Returns:
A list of CheckReportCloudflare objects with PASS status if Under
Attack Mode is disabled, or FAIL status if it is currently enabled.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
security_level = (zone.settings.security_level or "").lower()
if security_level == "under_attack":
report.status = "FAIL"
report.status_extended = (
f"Zone {zone.name} has Under Attack Mode enabled."
)
else:
report.status = "PASS"
report.status_extended = (
f"Zone {zone.name} does not have Under Attack Mode enabled."
)
findings.append(report)
return findings

View File

@@ -16,12 +16,13 @@ class Zone(CloudflareService):
self._list_zones()
self._get_zones_settings()
self._get_zones_dnssec()
self._get_zones_universal_ssl()
def _list_zones(self) -> None:
"""List all Cloudflare zones with their basic information."""
logger.info("Zone - Listing zones...")
audited_accounts = self.provider.identity.audited_accounts
filter_zone = self.provider.filter_zone
filter_zones = self.provider.filter_zones
seen_zone_ids: set[str] = set()
try:
@@ -43,9 +44,9 @@ class Zone(CloudflareService):
# Apply zone filter if specified via --region
if (
filter_zone
and zone_id not in filter_zone
and zone_name not in filter_zone
filter_zones
and zone_id not in filter_zones
and zone_name not in filter_zones
):
continue
@@ -107,6 +108,20 @@ class Zone(CloudflareService):
f"{zone.id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_zones_universal_ssl(self) -> None:
"""Get Universal SSL settings for all zones."""
logger.info("Zones - Getting Universal SSL settings...")
for zone in self.zones.values():
try:
universal_ssl = self.client.ssl.universal.settings.get(zone_id=zone.id)
zone.settings.universal_ssl_enabled = getattr(
universal_ssl, "enabled", False
)
except Exception as error:
logger.error(
f"{zone.id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_zone_setting(self, zone_id: str, setting_id: str):
"""Get a single zone setting by ID."""
try:
@@ -127,7 +142,6 @@ class Zone(CloudflareService):
"ssl",
"tls_1_3",
"automatic_https_rewrites",
"universal_ssl",
"security_header",
"waf",
"security_level",
@@ -148,7 +162,6 @@ class Zone(CloudflareService):
ssl_encryption_mode=settings.get("ssl"),
tls_1_3=settings.get("tls_1_3"),
automatic_https_rewrites=settings.get("automatic_https_rewrites"),
universal_ssl=settings.get("universal_ssl"),
strict_transport_security=self._get_strict_transport_security(
settings.get("security_header")
),
@@ -210,7 +223,7 @@ class CloudflareZoneSettings(BaseModel):
ssl_encryption_mode: Optional[str] = None
tls_1_3: Optional[str] = None
automatic_https_rewrites: Optional[str] = None
universal_ssl: Optional[str] = None
universal_ssl_enabled: bool = False
# HSTS settings
strict_transport_security: StrictTransportSecurity = Field(
default_factory=StrictTransportSecurity

View File

@@ -8,6 +8,7 @@
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **SSL/TLS encryption mode** by checking if the mode is set to `Full (Strict)` to ensure **end-to-end encryption** with certificate validation.",
"Risk": "Without **strict SSL mode**, traffic between Cloudflare and origin may use unvalidated or unencrypted connections.\n- **Confidentiality**: sensitive data can be intercepted in transit via man-in-the-middle attacks\n- **Integrity**: responses can be modified without detection between Cloudflare and origin\n- **Compliance**: may violate PCI-DSS, HIPAA, and other regulatory requirements for encrypted transport",
"RelatedUrl": "",

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_tls_1_3_enabled",
"CheckTitle": "TLS 1.3 is enabled",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **TLS 1.3** configuration by checking if it is enabled to benefit from **improved security** through simplified cipher suites and **faster handshakes** with zero round-trip time resumption.",
"Risk": "Without **TLS 1.3**, connections use older TLS versions with less secure characteristics.\n- **Confidentiality**: legacy TLS versions have more complex cipher negotiations that may expose weaknesses\n- **Integrity**: older protocols lack protection against downgrade attacks that TLS 1.3 provides\n- **Performance**: slower handshakes impact user experience and increase latency",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/ssl/edge-certificates/additional-options/tls-13/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to SSL/TLS > Edge Certificates\n3. Scroll to TLS 1.3\n4. Toggle the setting to On\n5. Verify that your clients support TLS 1.3 (all modern browsers do)",
"Terraform": "```hcl\n# Enable TLS 1.3 for improved security and performance\nresource \"cloudflare_zone_settings_override\" \"tls_settings\" {\n zone_id = \"<ZONE_ID>\"\n settings {\n tls_1_3 = \"on\" # Critical: enables most secure TLS version with faster handshakes\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable **TLS 1.3** to provide the most secure and efficient TLS connections.\n- All modern browsers support TLS 1.3 ensuring broad compatibility\n- TLS 1.3 removes obsolete cryptographic algorithms\n- Zero round-trip time (0-RTT) resumption improves performance\n- Combine with minimum TLS version set to 1.2 or higher",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_tls_1_3_enabled"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "TLS 1.3 is enabled by default on Cloudflare but should be verified. It provides security improvements and performance benefits through simplified handshakes."
}

View File

@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_tls_1_3_enabled(Check):
"""Ensure that TLS 1.3 is enabled for Cloudflare zones.
TLS 1.3 provides improved security through simplified cipher suites and
faster handshakes with zero round-trip time (0-RTT) resumption. It removes
outdated cryptographic algorithms, reduces handshake latency, and provides
better forward secrecy compared to previous TLS versions.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the TLS 1.3 enabled check.
Iterates through all Cloudflare zones and verifies that TLS 1.3 is
enabled. The check accepts both "on" (standard TLS 1.3) and "zrt"
(TLS 1.3 with 0-RTT) as valid enabled states.
Returns:
A list of CheckReportCloudflare objects with PASS status if TLS 1.3
is enabled, or FAIL status if it is disabled for the zone.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
tls_1_3 = (zone.settings.tls_1_3 or "").lower()
if tls_1_3 in ["on", "zrt"]:
report.status = "PASS"
report.status_extended = f"TLS 1.3 is enabled for zone {zone.name}."
else:
report.status = "FAIL"
report.status_extended = f"TLS 1.3 is not enabled for zone {zone.name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "cloudflare",
"CheckID": "zone_universal_ssl_enabled",
"CheckTitle": "Universal SSL is enabled",
"CheckType": [],
"ServiceName": "zone",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Zone",
"ResourceGroup": "network",
"Description": "**Cloudflare zones** are assessed for **Universal SSL** configuration by checking if it is enabled to provide **free SSL/TLS certificates** for the domain and its subdomains, enabling secure HTTPS connections.",
"Risk": "Without **Universal SSL**, visitors cannot establish HTTPS connections to your site.\n- **Confidentiality**: all traffic is unencrypted and vulnerable to interception and eavesdropping\n- **Integrity**: HTTP responses can be modified in transit by attackers (content injection, malware)\n- **Trust**: browsers display security warnings degrading user trust and experience",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developers.cloudflare.com/ssl/edge-certificates/universal-ssl/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Cloudflare dashboard and select your account and domain\n2. Go to SSL/TLS > Edge Certificates\n3. Scroll to Universal SSL\n4. Ensure the status shows Active\n5. If disabled, click Enable Universal SSL\n6. Wait for certificate issuance (typically within 24 hours for new domains)",
"Terraform": "```hcl\n# Note: Universal SSL is enabled by default on Cloudflare\n# To explicitly manage SSL settings, use zone_settings_override\nresource \"cloudflare_zone_settings_override\" \"ssl_settings\" {\n zone_id = \"<ZONE_ID>\"\n settings {\n ssl = \"full\" # Critical: enables SSL/TLS encryption\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable **Universal SSL** to provide HTTPS capability for your domain.\n- Universal SSL is the foundation for transport security\n- Required before implementing HSTS or other HTTPS-dependent features\n- Cloudflare automatically renews certificates before expiration\n- Consider upgrading to Advanced Certificate Manager for additional control",
"Url": "https://hub.prowler.com/checks/cloudflare/zone_universal_ssl_enabled"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Universal SSL is enabled by default for all Cloudflare zones. If it was previously disabled, re-enabling may take up to 24 hours for certificate issuance."
}

View File

@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportCloudflare
from prowler.providers.cloudflare.services.zone.zone_client import zone_client
class zone_universal_ssl_enabled(Check):
"""Ensure that Universal SSL is enabled for Cloudflare zones.
Universal SSL provides free SSL/TLS certificates for the domain and its
subdomains, enabling secure HTTPS connections without requiring manual
certificate management. This feature automatically provisions and renews
certificates, ensuring continuous protection for web traffic.
"""
def execute(self) -> list[CheckReportCloudflare]:
"""Execute the Universal SSL enabled check.
Iterates through all Cloudflare zones and verifies that Universal SSL
is enabled. Universal SSL provides automatic certificate provisioning
and management for the zone and its subdomains.
Returns:
A list of CheckReportCloudflare objects with PASS status if Universal
SSL is enabled, or FAIL status if it is disabled for the zone.
"""
findings = []
for zone in zone_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
if zone.settings.universal_ssl_enabled:
report.status = "PASS"
report.status_extended = (
f"Universal SSL is enabled for zone {zone.name}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Universal SSL is not enabled for zone {zone.name}."
)
findings.append(report)
return findings

View File

@@ -169,7 +169,7 @@ class TestCloudflareProvider:
with pytest.raises(CloudflareCredentialsError):
CloudflareProvider()
def test_cloudflare_provider_with_filter_zone(self):
def test_cloudflare_provider_with_filter_zones(self):
with (
patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
@@ -196,10 +196,10 @@ class TestCloudflareProvider:
),
),
):
filter_zone = ["zone1", "zone2"]
provider = CloudflareProvider(filter_zone=filter_zone)
filter_zones = ["zone1", "zone2"]
provider = CloudflareProvider(filter_zones=filter_zones)
assert provider.filter_zone == set(filter_zone)
assert provider.filter_zones == set(filter_zones)
def test_cloudflare_provider_properties(self):
with (

View File

@@ -0,0 +1,119 @@
from typing import Optional
from pydantic import BaseModel
from tests.providers.cloudflare.cloudflare_fixtures import ZONE_ID, ZONE_NAME
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation for testing."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False
class TestDNSService:
def test_cloudflare_dns_record_model(self):
record = CloudflareDNSRecord(
id="record-123",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name="www.example.com",
type="A",
content="192.0.2.1",
ttl=3600,
proxied=True,
)
assert record.id == "record-123"
assert record.zone_id == ZONE_ID
assert record.zone_name == ZONE_NAME
assert record.name == "www.example.com"
assert record.type == "A"
assert record.content == "192.0.2.1"
assert record.ttl == 3600
assert record.proxied is True
def test_cloudflare_dns_record_defaults(self):
record = CloudflareDNSRecord(
id="record-123",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
)
assert record.id == "record-123"
assert record.zone_id == ZONE_ID
assert record.zone_name == ZONE_NAME
assert record.name is None
assert record.type is None
assert record.content == ""
assert record.ttl is None
assert record.proxied is False
def test_cloudflare_dns_record_txt(self):
record = CloudflareDNSRecord(
id="record-txt",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="TXT",
content="v=spf1 include:_spf.google.com ~all",
ttl=1,
proxied=False,
)
assert record.type == "TXT"
assert "v=spf1" in record.content
assert record.proxied is False
def test_cloudflare_dns_record_cname(self):
record = CloudflareDNSRecord(
id="record-cname",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name="www.example.com",
type="CNAME",
content="example.com",
ttl=3600,
proxied=True,
)
assert record.type == "CNAME"
assert record.content == "example.com"
assert record.proxied is True
def test_cloudflare_dns_record_mx(self):
record = CloudflareDNSRecord(
id="record-mx",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="MX",
content="10 mail.example.com",
ttl=3600,
proxied=False,
)
assert record.type == "MX"
assert "mail.example.com" in record.content
def test_cloudflare_dns_record_caa(self):
record = CloudflareDNSRecord(
id="record-caa",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="CAA",
content='0 issue "letsencrypt.org"',
ttl=3600,
proxied=False,
)
assert record.type == "CAA"
assert "letsencrypt.org" in record.content

View File

@@ -0,0 +1,143 @@
from unittest import mock
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class Test_zone_automatic_https_rewrites_enabled:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled import (
zone_automatic_https_rewrites_enabled,
)
check = zone_automatic_https_rewrites_enabled()
result = check.execute()
assert len(result) == 0
def test_zone_automatic_https_rewrites_enabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
automatic_https_rewrites="on",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled import (
zone_automatic_https_rewrites_enabled,
)
check = zone_automatic_https_rewrites_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert "Automatic HTTPS Rewrites is enabled" in result[0].status_extended
def test_zone_automatic_https_rewrites_disabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
automatic_https_rewrites="off",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled import (
zone_automatic_https_rewrites_enabled,
)
check = zone_automatic_https_rewrites_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Automatic HTTPS Rewrites is not enabled" in result[0].status_extended
)
def test_zone_automatic_https_rewrites_none(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
automatic_https_rewrites=None,
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_automatic_https_rewrites_enabled.zone_automatic_https_rewrites_enabled import (
zone_automatic_https_rewrites_enabled,
)
check = zone_automatic_https_rewrites_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"Automatic HTTPS Rewrites is not enabled" in result[0].status_extended
)

View File

@@ -0,0 +1,139 @@
from unittest import mock
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class Test_zone_email_obfuscation_enabled:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled import (
zone_email_obfuscation_enabled,
)
check = zone_email_obfuscation_enabled()
result = check.execute()
assert len(result) == 0
def test_zone_email_obfuscation_enabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
email_obfuscation="on",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled import (
zone_email_obfuscation_enabled,
)
check = zone_email_obfuscation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert "Email Obfuscation is enabled" in result[0].status_extended
def test_zone_email_obfuscation_disabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
email_obfuscation="off",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled import (
zone_email_obfuscation_enabled,
)
check = zone_email_obfuscation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Email Obfuscation is not enabled" in result[0].status_extended
def test_zone_email_obfuscation_none(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
email_obfuscation=None,
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_email_obfuscation_enabled.zone_email_obfuscation_enabled import (
zone_email_obfuscation_enabled,
)
check = zone_email_obfuscation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Email Obfuscation is not enabled" in result[0].status_extended

View File

@@ -0,0 +1,323 @@
from typing import Optional
from unittest import mock
from pydantic import BaseModel
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation for testing."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False
class Test_zone_record_caa_exists:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
dns_client = mock.MagicMock
dns_client.records = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists import (
zone_record_caa_exists,
)
check = zone_record_caa_exists()
result = check.execute()
assert len(result) == 0
def test_zone_with_caa_record_issue_tag(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="CAA",
content='0 issue "letsencrypt.org"',
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists import (
zone_record_caa_exists,
)
check = zone_record_caa_exists()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CAA record with certificate issuance restrictions exists for zone {ZONE_NAME}: {ZONE_NAME}."
)
def test_zone_with_multiple_caa_records(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="CAA",
content='0 issue "letsencrypt.org"',
),
CloudflareDNSRecord(
id="record-2",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="CAA",
content='0 issuewild ";"',
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists import (
zone_record_caa_exists,
)
check = zone_record_caa_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CAA record with certificate issuance restrictions exists for zone {ZONE_NAME}: {ZONE_NAME}, {ZONE_NAME}."
)
def test_zone_with_caa_record_only_iodef(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="CAA",
content='0 iodef "mailto:security@example.com"',
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists import (
zone_record_caa_exists,
)
check = zone_record_caa_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CAA record exists for zone {ZONE_NAME} but does not specify authorized CAs with issue or issuewild tags: {ZONE_NAME}."
)
def test_zone_without_caa_record(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="A",
content="192.0.2.1",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists import (
zone_record_caa_exists,
)
check = zone_record_caa_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No CAA record found for zone {ZONE_NAME}."
)
def test_zone_with_caa_record_for_different_zone(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id="other-zone-id",
zone_name="other.com",
name="other.com",
type="CAA",
content='0 issue "letsencrypt.org"',
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_caa_exists.zone_record_caa_exists import (
zone_record_caa_exists,
)
check = zone_record_caa_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No CAA record found for zone {ZONE_NAME}."
)

View File

@@ -0,0 +1,646 @@
from typing import Optional
from unittest import mock
from pydantic import BaseModel
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation for testing."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False
# Valid DKIM public key for testing (real RSA 2048-bit key in DER SubjectPublicKeyInfo format)
# This is a complete valid RSA public key that can be loaded by cryptography library
VALID_DKIM_KEY = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp4czBy2GlDrezAtyoKtrqZYpTLMsuJz1HjV0wZ/yIpClhKp5f8xGlAJuxOjxWokz5SoyW/XpmUtIPkFYwj90jlvUVkFhh9Q81BlJ/0DmhNnmIOs9MnVzgnLiUfNv06NQeKg3d65reCWNjEyrb1fDP6U4ePKM/lunTQc5CbHEUnSnU43vXpUO8v1TYb6OGeAKhumfVSdXFBF905c43/sqkt2QeRMabIoWPkYlSI0KSV0qhNpcRtOdfntFSyPljwa7iNVLlV9AckdL4+abOiy8zuYW0GDF5/1Jgl/Xbdab2M70AXuFnYldq6EgkhvyyiGEm7/15H5STgKxp8idarb6XQIDAQAB"
class Test_zone_record_dkim_exists:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
dns_client = mock.MagicMock
dns_client.records = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 0
def test_zone_with_dkim_record_valid_key(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
content=f"v=DKIM1; k=rsa; p={VALID_DKIM_KEY}",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DKIM record with valid public key exists for zone {ZONE_NAME}: google._domainkey.{ZONE_NAME}."
)
def test_zone_with_multiple_dkim_records(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
content=f"v=DKIM1; k=rsa; p={VALID_DKIM_KEY}",
),
CloudflareDNSRecord(
id="record-2",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"selector1._domainkey.{ZONE_NAME}",
type="TXT",
content=f"v=DKIM1; k=rsa; p={VALID_DKIM_KEY}",
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DKIM record with valid public key exists for zone {ZONE_NAME}: google._domainkey.{ZONE_NAME}, selector1._domainkey.{ZONE_NAME}."
)
def test_zone_with_dkim_record_revoked_key(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
content="v=DKIM1; k=rsa; p=",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"DKIM record exists for zone {ZONE_NAME} but has invalid or missing public key: google._domainkey.{ZONE_NAME}."
)
def test_zone_with_dkim_record_invalid_key_not_real_public_key(self):
"""Test that valid Base64 that is not a real public key fails."""
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
# Valid Base64 but not a valid DER-encoded public key
content="v=DKIM1; k=rsa; p=SGVsbG9Xb3JsZFRoaXNJc05vdEFWYWxpZFB1YmxpY0tleUJ1dEl0SXNWYWxpZEJhc2U2NA==",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"DKIM record exists for zone {ZONE_NAME} but has invalid or missing public key: google._domainkey.{ZONE_NAME}."
)
def test_zone_with_dkim_record_invalid_base64(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
# Invalid Base64 - contains characters not valid in Base64 and is long enough
content="v=DKIM1; k=rsa; p=ThisIsNotValidBase64!!!@@@###$$$%%%^^^&&&***Because_It_Contains_Invalid_Characters_And_Is_Long_Enough_To_Pass_Length_Check",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"DKIM record exists for zone {ZONE_NAME} but has invalid or missing public key: google._domainkey.{ZONE_NAME}."
)
def test_zone_without_dkim_record(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="A",
content="192.0.2.1",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No DKIM record found for zone {ZONE_NAME}."
)
def test_zone_with_domainkey_but_not_dkim(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
content="some other txt record",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No DKIM record found for zone {ZONE_NAME}."
)
def test_zone_with_dkim_record_lowercase(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"default._domainkey.{ZONE_NAME}",
type="TXT",
content=f"v=dkim1; k=rsa; p={VALID_DKIM_KEY}",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DKIM record with valid public key exists for zone {ZONE_NAME}: default._domainkey.{ZONE_NAME}."
)
def test_zone_with_dkim_record_different_zone(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id="other-zone-id",
zone_name="other.com",
name="google._domainkey.other.com",
type="TXT",
content=f"v=DKIM1; k=rsa; p={VALID_DKIM_KEY}",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No DKIM record found for zone {ZONE_NAME}."
)
def test_zone_with_dkim_record_quoted_content(self):
"""Test that DKIM records with quoted content from Cloudflare API work."""
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
# Cloudflare API returns content wrapped in quotes
content=f'"v=DKIM1; k=rsa; p={VALID_DKIM_KEY}"',
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DKIM record with valid public key exists for zone {ZONE_NAME}: google._domainkey.{ZONE_NAME}."
)
def test_zone_with_dkim_record_split_quoted_content(self):
"""Test that long DKIM records split into multiple quoted strings work."""
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
# Split the key to simulate how Cloudflare returns long TXT records
# The split happens in the middle of the p= value with '" "' between parts
key_part1 = VALID_DKIM_KEY[:200]
key_part2 = VALID_DKIM_KEY[200:]
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"google._domainkey.{ZONE_NAME}",
type="TXT",
# Cloudflare splits long TXT records with '" "' between parts
content=f'v=DKIM1; k=rsa; p={key_part1}" "{key_part2}',
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dkim_exists.zone_record_dkim_exists import (
zone_record_dkim_exists,
)
check = zone_record_dkim_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DKIM record with valid public key exists for zone {ZONE_NAME}: google._domainkey.{ZONE_NAME}."
)

View File

@@ -0,0 +1,417 @@
from typing import Optional
from unittest import mock
from pydantic import BaseModel
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation for testing."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False
class Test_zone_record_dmarc_exists:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
dns_client = mock.MagicMock
dns_client.records = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 0
def test_zone_with_dmarc_record_reject_policy(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"_dmarc.{ZONE_NAME}",
type="TXT",
content="v=DMARC1; p=reject; rua=mailto:dmarc@example.com",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DMARC record with enforcement policy p=reject exists for zone {ZONE_NAME}: _dmarc.{ZONE_NAME}."
)
def test_zone_with_dmarc_record_quarantine_policy(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"_dmarc.{ZONE_NAME}",
type="TXT",
content="v=DMARC1; p=quarantine; rua=mailto:dmarc@example.com",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DMARC record with enforcement policy p=quarantine exists for zone {ZONE_NAME}: _dmarc.{ZONE_NAME}."
)
def test_zone_with_dmarc_record_none_policy(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"_dmarc.{ZONE_NAME}",
type="TXT",
content="v=DMARC1; p=none; rua=mailto:dmarc@example.com",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"DMARC record exists for zone {ZONE_NAME} but uses monitoring-only policy p=none: _dmarc.{ZONE_NAME}."
)
def test_zone_without_dmarc_record(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="A",
content="192.0.2.1",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No DMARC record found for zone {ZONE_NAME}."
)
def test_zone_with_txt_record_but_not_dmarc(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"_dmarc.{ZONE_NAME}",
type="TXT",
content="some other txt record",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No DMARC record found for zone {ZONE_NAME}."
)
def test_zone_with_dmarc_record_lowercase(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=f"_dmarc.{ZONE_NAME}",
type="TXT",
content="v=dmarc1; p=reject; rua=mailto:dmarc@example.com",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DMARC record with enforcement policy p=reject exists for zone {ZONE_NAME}: _dmarc.{ZONE_NAME}."
)
def test_zone_with_dmarc_record_different_zone(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id="other-zone-id",
zone_name="other.com",
name="_dmarc.other.com",
type="TXT",
content="v=DMARC1; p=reject",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_dmarc_exists.zone_record_dmarc_exists import (
zone_record_dmarc_exists,
)
check = zone_record_dmarc_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No DMARC record found for zone {ZONE_NAME}."
)

View File

@@ -0,0 +1,315 @@
from typing import Optional
from unittest import mock
from pydantic import BaseModel
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation for testing."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False
class Test_zone_record_spf_exists:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
dns_client = mock.MagicMock
dns_client.records = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists import (
zone_record_spf_exists,
)
check = zone_record_spf_exists()
result = check.execute()
assert len(result) == 0
def test_zone_with_spf_record_strict_policy(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="TXT",
content="v=spf1 include:_spf.google.com -all",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists import (
zone_record_spf_exists,
)
check = zone_record_spf_exists()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SPF record with strict policy -all exists for zone {ZONE_NAME}: {ZONE_NAME}."
)
def test_zone_with_spf_record_permissive_policy(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="TXT",
content="v=spf1 include:_spf.google.com ~all",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists import (
zone_record_spf_exists,
)
check = zone_record_spf_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SPF record exists for zone {ZONE_NAME} but does not use strict policy -all: {ZONE_NAME}."
)
def test_zone_without_spf_record(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="A",
content="192.0.2.1",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists import (
zone_record_spf_exists,
)
check = zone_record_spf_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SPF record found for zone {ZONE_NAME}."
)
def test_zone_with_txt_record_but_not_spf(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id=ZONE_ID,
zone_name=ZONE_NAME,
name=ZONE_NAME,
type="TXT",
content="google-site-verification=abc123",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists import (
zone_record_spf_exists,
)
check = zone_record_spf_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SPF record found for zone {ZONE_NAME}."
)
def test_zone_with_spf_record_different_zone(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(),
)
}
dns_client = mock.MagicMock
dns_client.records = [
CloudflareDNSRecord(
id="record-1",
zone_id="other-zone-id",
zone_name="other.com",
name="other.com",
type="TXT",
content="v=spf1 include:_spf.google.com -all",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.zone_client",
new=zone_client,
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists.dns_client",
new=dns_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_record_spf_exists.zone_record_spf_exists import (
zone_record_spf_exists,
)
check = zone_record_spf_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SPF record found for zone {ZONE_NAME}."
)

View File

@@ -0,0 +1,210 @@
from unittest import mock
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class Test_zone_security_under_attack_disabled:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled import (
zone_security_under_attack_disabled,
)
check = zone_security_under_attack_disabled()
result = check.execute()
assert len(result) == 0
def test_zone_under_attack_mode_enabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
security_level="under_attack",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled import (
zone_security_under_attack_disabled,
)
check = zone_security_under_attack_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Zone {ZONE_NAME} has Under Attack Mode enabled."
)
def test_zone_security_level_high(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
security_level="high",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled import (
zone_security_under_attack_disabled,
)
check = zone_security_under_attack_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Zone {ZONE_NAME} does not have Under Attack Mode enabled."
)
def test_zone_security_level_medium(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
security_level="medium",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled import (
zone_security_under_attack_disabled,
)
check = zone_security_under_attack_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
def test_zone_security_level_low(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
security_level="low",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled import (
zone_security_under_attack_disabled,
)
check = zone_security_under_attack_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
def test_zone_security_level_none(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
security_level=None,
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_security_under_attack_disabled.zone_security_under_attack_disabled import (
zone_security_under_attack_disabled,
)
check = zone_security_under_attack_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"

View File

@@ -0,0 +1,173 @@
from unittest import mock
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class Test_zone_tls_1_3_enabled:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled import (
zone_tls_1_3_enabled,
)
check = zone_tls_1_3_enabled()
result = check.execute()
assert len(result) == 0
def test_zone_tls_1_3_enabled_on(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
tls_1_3="on",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled import (
zone_tls_1_3_enabled,
)
check = zone_tls_1_3_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert "TLS 1.3 is enabled" in result[0].status_extended
def test_zone_tls_1_3_enabled_zrt(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
tls_1_3="zrt",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled import (
zone_tls_1_3_enabled,
)
check = zone_tls_1_3_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "TLS 1.3 is enabled" in result[0].status_extended
def test_zone_tls_1_3_disabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
tls_1_3="off",
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled import (
zone_tls_1_3_enabled,
)
check = zone_tls_1_3_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "TLS 1.3 is not enabled" in result[0].status_extended
def test_zone_tls_1_3_none(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
tls_1_3=None,
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_tls_1_3_enabled.zone_tls_1_3_enabled import (
zone_tls_1_3_enabled,
)
check = zone_tls_1_3_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "TLS 1.3 is not enabled" in result[0].status_extended

View File

@@ -0,0 +1,111 @@
from unittest import mock
from prowler.providers.cloudflare.services.zone.zone_service import (
CloudflareZone,
CloudflareZoneSettings,
)
from tests.providers.cloudflare.cloudflare_fixtures import (
ZONE_ID,
ZONE_NAME,
set_mocked_cloudflare_provider,
)
class Test_zone_universal_ssl_enabled:
def test_no_zones(self):
zone_client = mock.MagicMock
zone_client.zones = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_universal_ssl_enabled.zone_universal_ssl_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_universal_ssl_enabled.zone_universal_ssl_enabled import (
zone_universal_ssl_enabled,
)
check = zone_universal_ssl_enabled()
result = check.execute()
assert len(result) == 0
def test_zone_universal_ssl_enabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
universal_ssl_enabled=True,
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_universal_ssl_enabled.zone_universal_ssl_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_universal_ssl_enabled.zone_universal_ssl_enabled import (
zone_universal_ssl_enabled,
)
check = zone_universal_ssl_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Universal SSL is enabled for zone {ZONE_NAME}."
)
def test_zone_universal_ssl_disabled(self):
zone_client = mock.MagicMock
zone_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
universal_ssl_enabled=False,
),
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zone.zone_universal_ssl_enabled.zone_universal_ssl_enabled.zone_client",
new=zone_client,
),
):
from prowler.providers.cloudflare.services.zone.zone_universal_ssl_enabled.zone_universal_ssl_enabled import (
zone_universal_ssl_enabled,
)
check = zone_universal_ssl_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Universal SSL is not enabled for zone {ZONE_NAME}."
)