feat: enhance zones_caa_record_exists and zones_universal_ssl_enabled

This commit is contained in:
HugoPBrito
2026-01-07 16:09:24 +01:00
parent 979ae1150c
commit a7d8c8f679
5 changed files with 51 additions and 48 deletions

View File

@@ -22,9 +22,16 @@ class zones_caa_record_exists(Check):
if caa_records:
report.status = "PASS"
# Extract CA names from CAA record content (format: "0 issue "ca.org"")
ca_names = []
for record in caa_records:
parts = record.content.split()
if len(parts) >= 3:
ca_names.append(parts[2].strip('"'))
else:
ca_names.append(record.content)
report.status_extended = (
f"CAA record exists for zone {zone.name} "
f"({len(caa_records)} record(s))."
f"CAA record exists for zone {zone.name}: {', '.join(ca_names)}."
)
else:
report.status = "FAIL"

View File

@@ -16,6 +16,7 @@ class Zones(CloudflareService):
self._list_zones()
self._get_zones_settings()
self._get_zones_dnssec()
self._get_zones_universal_ssl()
def _list_zones(self) -> None:
"""List all Cloudflare zones with their basic information."""
@@ -107,6 +108,20 @@ class Zones(CloudflareService):
f"{zone.id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_zones_universal_ssl(self) -> None:
"""Get Universal SSL settings for all zones."""
logger.info("Zones - Getting Universal SSL settings...")
for zone in self.zones.values():
try:
universal_ssl = self.client.ssl.universal.settings.get(zone_id=zone.id)
zone.settings.universal_ssl_enabled = getattr(
universal_ssl, "enabled", False
)
except Exception as error:
logger.error(
f"{zone.id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_zone_setting(self, zone_id: str, setting_id: str):
"""Get a single zone setting by ID."""
try:
@@ -127,7 +142,6 @@ class Zones(CloudflareService):
"ssl",
"tls_1_3",
"automatic_https_rewrites",
"universal_ssl",
"security_header",
"waf",
"security_level",
@@ -148,7 +162,6 @@ class Zones(CloudflareService):
ssl_encryption_mode=settings.get("ssl"),
tls_1_3=settings.get("tls_1_3"),
automatic_https_rewrites=settings.get("automatic_https_rewrites"),
universal_ssl=settings.get("universal_ssl"),
strict_transport_security=self._get_strict_transport_security(
settings.get("security_header")
),
@@ -210,7 +223,7 @@ class CloudflareZoneSettings(BaseModel):
ssl_encryption_mode: Optional[str] = None
tls_1_3: Optional[str] = None
automatic_https_rewrites: Optional[str] = None
universal_ssl: Optional[str] = None
universal_ssl_enabled: bool = False
# HSTS settings
strict_transport_security: StrictTransportSecurity = Field(
default_factory=StrictTransportSecurity

View File

@@ -10,8 +10,7 @@ class zones_universal_ssl_enabled(Check):
metadata=self.metadata(),
resource=zone,
)
universal_ssl = (zone.settings.universal_ssl or "").lower()
if universal_ssl == "on":
if zone.settings.universal_ssl_enabled:
report.status = "PASS"
report.status_extended = (
f"Universal SSL is enabled for zone {zone.name}."

View File

@@ -105,7 +105,10 @@ class Test_zones_caa_record_exists:
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert "CAA record exists" in result[0].status_extended
assert (
result[0].status_extended
== f"CAA record exists for zone {ZONE_NAME}: letsencrypt.org."
)
def test_zone_with_multiple_caa_records(self):
zones_client = mock.MagicMock
@@ -161,7 +164,10 @@ class Test_zones_caa_record_exists:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "2 record(s)" in result[0].status_extended
assert (
result[0].status_extended
== f"CAA record exists for zone {ZONE_NAME}: letsencrypt.org, ;."
)
def test_zone_without_caa_record(self):
zones_client = mock.MagicMock
@@ -209,7 +215,10 @@ class Test_zones_caa_record_exists:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "No CAA record found" in result[0].status_extended
assert (
result[0].status_extended
== f"No CAA record found for zone {ZONE_NAME}."
)
def test_zone_with_caa_record_for_different_zone(self):
zones_client = mock.MagicMock
@@ -257,4 +266,7 @@ class Test_zones_caa_record_exists:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "No CAA record found" in result[0].status_extended
assert (
result[0].status_extended
== f"No CAA record found for zone {ZONE_NAME}."
)

View File

@@ -43,7 +43,7 @@ class Test_zones_universal_ssl_enabled:
status="active",
paused=False,
settings=CloudflareZoneSettings(
universal_ssl="on",
universal_ssl_enabled=True,
),
)
}
@@ -68,7 +68,10 @@ class Test_zones_universal_ssl_enabled:
assert result[0].resource_id == ZONE_ID
assert result[0].resource_name == ZONE_NAME
assert result[0].status == "PASS"
assert "Universal SSL is enabled" in result[0].status_extended
assert (
result[0].status_extended
== f"Universal SSL is enabled for zone {ZONE_NAME}."
)
def test_zone_universal_ssl_disabled(self):
zones_client = mock.MagicMock
@@ -79,7 +82,7 @@ class Test_zones_universal_ssl_enabled:
status="active",
paused=False,
settings=CloudflareZoneSettings(
universal_ssl="off",
universal_ssl_enabled=False,
),
)
}
@@ -102,38 +105,7 @@ class Test_zones_universal_ssl_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Universal SSL is not enabled" in result[0].status_extended
def test_zone_universal_ssl_none(self):
zones_client = mock.MagicMock
zones_client.zones = {
ZONE_ID: CloudflareZone(
id=ZONE_ID,
name=ZONE_NAME,
status="active",
paused=False,
settings=CloudflareZoneSettings(
universal_ssl=None,
),
assert (
result[0].status_extended
== f"Universal SSL is not enabled for zone {ZONE_NAME}."
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_cloudflare_provider(),
),
mock.patch(
"prowler.providers.cloudflare.services.zones.zones_universal_ssl_enabled.zones_universal_ssl_enabled.zones_client",
new=zones_client,
),
):
from prowler.providers.cloudflare.services.zones.zones_universal_ssl_enabled.zones_universal_ssl_enabled import (
zones_universal_ssl_enabled,
)
check = zones_universal_ssl_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "Universal SSL is not enabled" in result[0].status_extended