Merge branch 'cloudflare-pr2-tls-email-checks' into cloudflare-pr3-bot-config-checks

This commit is contained in:
HugoPBrito
2025-12-16 12:42:46 +01:00
13 changed files with 84 additions and 32 deletions

View File

@@ -1,6 +1,3 @@
from os import environ
def init_parser(self):
"""Init the Cloudflare provider CLI parser."""
cloudflare_parser = self.subparsers.add_parser(
@@ -17,18 +14,3 @@ def init_parser(self):
metavar="ZONE",
help="Filter scan to specific Cloudflare zones (name or ID).",
)
def validate_arguments(arguments) -> tuple[bool, str]:
"""Validate Cloudflare provider arguments."""
token = environ.get("CLOUDFLARE_API_TOKEN", "")
api_key = environ.get("CLOUDFLARE_API_KEY", "")
api_email = environ.get("CLOUDFLARE_API_EMAIL", "")
if not token and not (api_key and api_email):
return (
False,
"Cloudflare provider requires CLOUDFLARE_API_TOKEN or the combination of CLOUDFLARE_API_KEY and CLOUDFLARE_API_EMAIL environment variables.",
)
return (True, "")

View File

@@ -0,0 +1,4 @@
from prowler.providers.cloudflare.services.dns.dns_service import DNS
from prowler.providers.common.provider import Provider
dns_client = DNS(Provider.get_global_provider())

View File

@@ -0,0 +1,64 @@
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.cloudflare.lib.service.service import CloudflareService
from prowler.providers.cloudflare.services.zones.zones_client import zones_client
class DNS(CloudflareService):
"""Retrieve Cloudflare DNS records for all zones."""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.records: list["CloudflareDNSRecord"] = []
self._list_dns_records()
def _list_dns_records(self) -> None:
"""List DNS records for all zones."""
logger.info("DNS - Listing DNS records...")
try:
for zone in zones_client.zones.values():
seen_record_ids: set[str] = set()
try:
for record in self.client.dns.records.list(zone_id=zone.id):
record_id = getattr(record, "id", None)
# Prevent infinite loop
if record_id in seen_record_ids:
break
seen_record_ids.add(record_id)
self.records.append(
CloudflareDNSRecord(
id=record_id,
zone_id=zone.id,
zone_name=zone.name,
name=getattr(record, "name", None),
type=getattr(record, "type", None),
content=getattr(record, "content", ""),
ttl=getattr(record, "ttl", None),
proxied=getattr(record, "proxied", False),
)
)
except Exception as error:
logger.error(
f"{zone.id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class CloudflareDNSRecord(BaseModel):
"""Cloudflare DNS record representation."""
id: str
zone_id: str
zone_name: str
name: Optional[str] = None
type: Optional[str] = None
content: str = ""
ttl: Optional[int] = None
proxied: bool = False

View File

@@ -5,7 +5,7 @@ from prowler.providers.cloudflare.services.zones.zones_client import zones_clien
class zones_automatic_https_rewrites_enabled(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,

View File

@@ -7,7 +7,7 @@ class zones_caa_record_exists(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
@@ -17,7 +17,7 @@ class zones_caa_record_exists(Check):
caa_records = [
record
for record in dns_client.records
if record.zone.id == zone.id and record.type == "CAA"
if record.zone_id == zone.id and record.type == "CAA"
]
if caa_records:

View File

@@ -7,7 +7,7 @@ class zones_dmarc_record_exists(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
@@ -17,8 +17,9 @@ class zones_dmarc_record_exists(Check):
dmarc_records = [
record
for record in dns_client.records
if record.zone.id == zone.id
if record.zone_id == zone.id
and record.type == "TXT"
and record.name
and record.name.startswith("_dmarc")
and "v=DMARC1" in record.content.upper()
]

View File

@@ -5,7 +5,7 @@ from prowler.providers.cloudflare.services.zones.zones_client import zones_clien
class zones_email_obfuscation_enabled(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,

View File

@@ -5,15 +5,16 @@ from prowler.providers.cloudflare.services.zones.zones_client import zones_clien
class zones_hsts_include_subdomains(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
)
if zone.settings.hsts_enabled and zone.settings.hsts_include_subdomains:
hsts = zone.settings.strict_transport_security
if hsts.enabled and hsts.include_subdomains:
report.status = "PASS"
report.status_extended = f"HSTS is enabled with includeSubDomains directive for zone {zone.name}."
elif zone.settings.hsts_enabled:
elif hsts.enabled:
report.status = "FAIL"
report.status_extended = f"HSTS is enabled but does not include subdomains for zone {zone.name}."
else:

View File

@@ -7,7 +7,7 @@ class zones_security_level(Check):
findings = []
acceptable_levels = ["medium", "high", "under_attack"]
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,

View File

@@ -7,7 +7,7 @@ class zones_spf_record_exists(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,
@@ -17,7 +17,7 @@ class zones_spf_record_exists(Check):
spf_records = [
record
for record in dns_client.records
if record.zone.id == zone.id
if record.zone_id == zone.id
and record.type == "TXT"
and record.content.startswith("v=spf1")
]

View File

@@ -5,7 +5,7 @@ from prowler.providers.cloudflare.services.zones.zones_client import zones_clien
class zones_tls_1_3_enabled(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,

View File

@@ -5,7 +5,7 @@ from prowler.providers.cloudflare.services.zones.zones_client import zones_clien
class zones_universal_ssl_enabled(Check):
def execute(self) -> list[CheckReportCloudflare]:
findings = []
for zone in zones_client.zones:
for zone in zones_client.zones.values():
report = CheckReportCloudflare(
metadata=self.metadata(),
resource=zone,