fix(compliance): multi-section undercount & leaked provider tab (#11635)

Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
This commit is contained in:
Prowler Bot
2026-06-18 10:40:20 +02:00
committed by GitHub
parent dcc9401957
commit f6679fadf4
26 changed files with 1904 additions and 87 deletions
+8
View File
@@ -2,6 +2,14 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.30.3] (Prowler UNRELEASED)
### 🐞 Fixed
- CLI compliance summary tables no longer undercount findings mapped to multiple sections nor double-count a single finding mapped to several requirements within the same group/split, and the Provider column no longer leaks a value from another framework [(#11567)](https://github.com/prowler-cloud/prowler/pull/11567)
---
## [5.30.2] (Prowler v5.30.2)
### 🐞 Fixed
@@ -22,11 +22,14 @@ def get_asd_essential_eight_table(
pass_count = []
fail_count = []
muted_count = []
section_seen = {}
provider = ""
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "ASD-Essential-Eight":
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
@@ -36,21 +39,33 @@ def get_asd_essential_eight_table(
"PASS": 0,
"Muted": 0,
}
section_seen[section] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-section counts: count each finding once per section
# it belongs to (a finding can map to several sections).
if index not in section_seen[section]:
section_seen[section].add(index)
if finding.muted:
sections[section]["Muted"] += 1
elif finding.status == "FAIL":
sections[section]["FAIL"] += 1
elif finding.status == "PASS":
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
asd_essential_eight_compliance_table["Provider"].append(compliance.Provider)
asd_essential_eight_compliance_table["Provider"].append(provider)
asd_essential_eight_compliance_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
asd_essential_eight_compliance_table["Status"].append(
+20 -6
View File
@@ -22,33 +22,47 @@ def get_c5_table(
fail_count = []
muted_count = []
sections = {}
section_seen = {}
provider = ""
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "C5":
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
if section not in sections:
sections[section] = {"FAIL": 0, "PASS": 0, "Muted": 0}
section_seen[section] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-section counts: count each finding once per section
# it belongs to (a finding can map to several sections).
if index not in section_seen[section]:
section_seen[section].add(index)
if finding.muted:
sections[section]["Muted"] += 1
elif finding.status == "FAIL":
sections[section]["FAIL"] += 1
elif finding.status == "PASS":
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
section_table["Provider"].append(compliance.Provider)
section_table["Provider"].append(provider)
section_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
section_table["Status"].append(
+20 -6
View File
@@ -22,33 +22,47 @@ def get_ccc_table(
fail_count = []
muted_count = []
sections = {}
section_seen = {}
provider = ""
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "CCC":
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
if section not in sections:
sections[section] = {"FAIL": 0, "PASS": 0, "Muted": 0}
section_seen[section] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-section counts: count each finding once per section
# it belongs to (a finding can map to several sections).
if index not in section_seen[section]:
section_seen[section].add(index)
if finding.muted:
sections[section]["Muted"] += 1
elif finding.status == "FAIL":
sections[section]["FAIL"] += 1
elif finding.status == "PASS":
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
section_table["Provider"].append(compliance.Provider)
section_table["Provider"].append(provider)
section_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
section_table["Status"].append(
+25 -3
View File
@@ -13,6 +13,9 @@ def get_cis_table(
compliance_overview: bool,
):
sections = {}
section_muted_seen = {}
section_split_seen = {}
provider = ""
cis_compliance_table = {
"Provider": [],
"Section": [],
@@ -29,6 +32,7 @@ def get_cis_table(
for compliance in check_compliances:
version_in_name = compliance_framework.split("_")[1]
if compliance.Framework == "CIS" and version_in_name in compliance.Version:
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
@@ -40,9 +44,19 @@ def get_cis_table(
"Level 2": {"FAIL": 0, "PASS": 0},
"Muted": 0,
}
section_muted_seen[section] = set()
section_split_seen[section] = {
"Level 1": set(),
"Level 2": set(),
}
if finding.muted:
# Overview total: count each finding once per framework
if index not in muted_count:
muted_count.append(index)
# Per-section Muted: count each finding once per section
# it belongs to (a finding can map to several sections).
if index not in section_muted_seen[section]:
section_muted_seen[section].add(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
@@ -50,13 +64,21 @@ def get_cis_table(
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
if "Level 1" in attribute.Profile:
if not finding.muted:
if (
not finding.muted
and index not in section_split_seen[section]["Level 1"]
):
section_split_seen[section]["Level 1"].add(index)
if finding.status == "FAIL":
sections[section]["Level 1"]["FAIL"] += 1
else:
sections[section]["Level 1"]["PASS"] += 1
elif "Level 2" in attribute.Profile:
if not finding.muted:
if (
not finding.muted
and index not in section_split_seen[section]["Level 2"]
):
section_split_seen[section]["Level 2"].add(index)
if finding.status == "FAIL":
sections[section]["Level 2"]["FAIL"] += 1
else:
@@ -65,7 +87,7 @@ def get_cis_table(
# Add results to table
sections = dict(sorted(sections.items()))
for section in sections:
cis_compliance_table["Provider"].append(compliance.Provider)
cis_compliance_table["Provider"].append(provider)
cis_compliance_table["Section"].append(section)
if sections[section]["Level 1"]["FAIL"] > 0:
cis_compliance_table["Level 1"].append(
+15 -6
View File
@@ -13,6 +13,8 @@ def get_ens_table(
compliance_overview: bool,
):
marcos = {}
marco_muted_seen = {}
provider = ""
ens_compliance_table = {
"Proveedor": [],
"Marco/Categoria": [],
@@ -31,6 +33,7 @@ def get_ens_table(
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "ENS":
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
marco_categoria = f"{attribute.Marco}/{attribute.Categoria}"
@@ -44,17 +47,23 @@ def get_ens_table(
"Bajo": 0,
"Muted": 0,
}
marco_muted_seen[marco_categoria] = set()
if finding.muted:
# Overview total: count each finding once per framework
if index not in muted_count:
muted_count.append(index)
# Per-marco Muted: count each finding once per marco
# it belongs to (a finding can map to several marcos).
if index not in marco_muted_seen[marco_categoria]:
marco_muted_seen[marco_categoria].add(index)
marcos[marco_categoria]["Muted"] += 1
else:
if finding.status == "FAIL":
if (
attribute.Tipo != "recomendacion"
and index not in fail_count
):
fail_count.append(index)
if attribute.Tipo != "recomendacion":
if index not in fail_count:
fail_count.append(index)
# Mark every marco the finding belongs to as
# NO CUMPLE, not just the first one seen.
marcos[marco_categoria][
"Estado"
] = f"{Fore.RED}NO CUMPLE{Style.RESET_ALL}"
@@ -71,7 +80,7 @@ def get_ens_table(
# Add results to table
for marco in sorted(marcos):
ens_compliance_table["Proveedor"].append(compliance.Provider)
ens_compliance_table["Proveedor"].append(provider)
ens_compliance_table["Marco/Categoria"].append(marco)
ens_compliance_table["Estado"].append(marcos[marco]["Estado"])
ens_compliance_table["Opcional"].append(
@@ -13,7 +13,9 @@ def get_kisa_ismsp_table(
compliance_overview: bool,
):
sections = {}
section_seen = {}
sections_status = {}
provider = ""
kisa_ismsp_compliance_table = {
"Provider": [],
"Section": [],
@@ -31,6 +33,7 @@ def get_kisa_ismsp_table(
compliance.Framework.startswith("KISA")
and compliance.Version in compliance_framework
):
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
@@ -43,16 +46,28 @@ def get_kisa_ismsp_table(
},
"Muted": 0,
}
section_seen[section] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
sections[section]["Status"]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-section counts: count each finding once per section
# it belongs to (a finding can map to several sections).
if index not in section_seen[section]:
section_seen[section].add(index)
if finding.muted:
sections[section]["Muted"] += 1
elif finding.status == "FAIL":
sections[section]["Status"]["FAIL"] += 1
elif finding.status == "PASS":
sections[section]["Status"]["PASS"] += 1
# Add results to table
@@ -70,7 +85,7 @@ def get_kisa_ismsp_table(
else:
sections_status[section] = f"{Fore.GREEN}PASS{Style.RESET_ALL}"
for section in sections:
kisa_ismsp_compliance_table["Provider"].append(compliance.Provider)
kisa_ismsp_compliance_table["Provider"].append(provider)
kisa_ismsp_compliance_table["Section"].append(section)
kisa_ismsp_compliance_table["Status"].append(sections_status[section])
kisa_ismsp_compliance_table["Muted"].append(
@@ -13,6 +13,8 @@ def get_mitre_attack_table(
compliance_overview: bool,
):
tactics = {}
tactic_seen = {}
provider = ""
mitre_compliance_table = {
"Provider": [],
"Tactic": [],
@@ -30,27 +32,38 @@ def get_mitre_attack_table(
"MITRE-ATTACK" in compliance.Framework
and compliance.Version in compliance_framework
):
provider = compliance.Provider
for requirement in compliance.Requirements:
for tactic in requirement.Tactics:
if tactic not in tactics:
tactics[tactic] = {"FAIL": 0, "PASS": 0, "Muted": 0}
tactic_seen[tactic] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-tactic counts: count each finding once per tactic
# it belongs to (a finding can map to several tactics).
if index not in tactic_seen[tactic]:
tactic_seen[tactic].add(index)
if finding.muted:
tactics[tactic]["Muted"] += 1
else:
if finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
tactics[tactic]["FAIL"] += 1
elif finding.status == "FAIL":
tactics[tactic]["FAIL"] += 1
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
tactics[tactic]["PASS"] += 1
tactics[tactic]["PASS"] += 1
# Add results to table
tactics = dict(sorted(tactics.items()))
for tactic in tactics:
mitre_compliance_table["Provider"].append(compliance.Provider)
mitre_compliance_table["Provider"].append(provider)
mitre_compliance_table["Tactic"].append(tactic)
if tactics[tactic]["FAIL"] > 0:
mitre_compliance_table["Status"].append(
@@ -22,33 +22,47 @@ def get_okta_idaas_stig_table(
fail_count = []
muted_count = []
sections = {}
section_seen = {}
provider = ""
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "Okta-IDaaS-STIG":
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
if section not in sections:
sections[section] = {"FAIL": 0, "PASS": 0, "Muted": 0}
section_seen[section] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-section counts: count each finding once per section
# it belongs to (a finding can map to several sections).
if index not in section_seen[section]:
section_seen[section].add(index)
if finding.muted:
sections[section]["Muted"] += 1
elif finding.status == "FAIL":
sections[section]["FAIL"] += 1
elif finding.status == "PASS":
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
section_table["Provider"].append(compliance.Provider)
section_table["Provider"].append(provider)
section_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
section_table["Status"].append(
@@ -24,6 +24,8 @@ def get_prowler_threatscore_table(
fail_count = []
muted_count = []
pillars = {}
pillar_seen = {}
provider = ""
generic_score = 0
max_generic_score = 0
counted_findings_generic = []
@@ -35,6 +37,7 @@ def get_prowler_threatscore_table(
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "ProwlerThreatScore":
provider = compliance.Provider
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
pillar = attribute.Section
@@ -65,17 +68,28 @@ def get_prowler_threatscore_table(
if pillar not in pillars:
pillars[pillar] = {"FAIL": 0, "PASS": 0, "Muted": 0}
pillar_seen[pillar] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
pillars[pillar]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
pillars[pillar]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-pillar counts: count each finding once per pillar
# it belongs to (a finding can map to several pillars).
if index not in pillar_seen[pillar]:
pillar_seen[pillar].add(index)
if finding.muted:
pillars[pillar]["Muted"] += 1
elif finding.status == "FAIL":
pillars[pillar]["FAIL"] += 1
elif finding.status == "PASS":
pillars[pillar]["PASS"] += 1
# Generic score
@@ -90,18 +104,21 @@ def get_prowler_threatscore_table(
counted_findings_generic.append(index)
no_findings_pillars = []
bulk_compliance = Compliance.get_bulk(provider=compliance.Provider.lower()).get(
compliance_framework
bulk_compliance = (
Compliance.get_bulk(provider=provider.lower()).get(compliance_framework)
if provider
else None
)
for requirement in bulk_compliance.Requirements:
for attribute in requirement.Attributes:
pillar = attribute.Section
if pillar not in pillars.keys() and pillar not in no_findings_pillars:
no_findings_pillars.append(pillar)
if bulk_compliance:
for requirement in bulk_compliance.Requirements:
for attribute in requirement.Attributes:
pillar = attribute.Section
if pillar not in pillars.keys() and pillar not in no_findings_pillars:
no_findings_pillars.append(pillar)
pillars = dict(sorted(pillars.items()))
for pillar in pillars:
pillar_table["Provider"].append(compliance.Provider)
pillar_table["Provider"].append(provider)
pillar_table["Pillar"].append(pillar)
if max_score_per_pillar[pillar] == 0:
pillar_score = 100.0
@@ -127,7 +144,7 @@ def get_prowler_threatscore_table(
)
for pillar in no_findings_pillars:
pillar_table["Provider"].append(compliance.Provider)
pillar_table["Provider"].append(provider)
pillar_table["Pillar"].append(pillar)
pillar_table["Score"].append(f"{Style.BRIGHT}{Fore.GREEN}100%{Style.RESET_ALL}")
pillar_table["Status"].append(f"{Fore.GREEN}PASS{Style.RESET_ALL}")
@@ -163,6 +163,7 @@ def _render_grouped(
"""Grouped mode: one row per group with pass/fail counts."""
check_map = _build_requirement_check_map(framework, provider)
groups = {}
group_seen = {}
pass_count = []
fail_count = []
muted_count = []
@@ -176,17 +177,28 @@ def _render_grouped(
for group_key in _get_group_key(req, group_by):
if group_key not in groups:
groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0}
group_seen[group_key] = set()
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
groups[group_key]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
groups[group_key]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-group counts: count each finding once per group it belongs
# to (a finding can map to several groups via several requirements).
if index not in group_seen[group_key]:
group_seen[group_key].add(index)
if finding.muted:
groups[group_key]["Muted"] += 1
elif finding.status == "FAIL":
groups[group_key]["FAIL"] += 1
elif finding.status == "PASS":
groups[group_key]["PASS"] += 1
if not _print_overview(
@@ -258,6 +270,8 @@ def _render_split(
split_field = split_by.field
split_values = split_by.values
groups = {}
group_muted_seen = {}
group_split_seen = {}
pass_count = []
fail_count = []
muted_count = []
@@ -274,12 +288,19 @@ def _render_split(
sv: {"FAIL": 0, "PASS": 0} for sv in split_values
}
groups[group_key]["Muted"] = 0
group_muted_seen[group_key] = set()
group_split_seen[group_key] = {sv: set() for sv in split_values}
split_val = req.attributes.get(split_field, "")
if finding.muted:
# Overview total: count each finding once per framework
if index not in muted_count:
muted_count.append(index)
# Per-group Muted: count each finding once per group it
# belongs to (a finding can map to several groups).
if index not in group_muted_seen[group_key]:
group_muted_seen[group_key].add(index)
groups[group_key]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
@@ -289,7 +310,8 @@ def _render_split(
for sv in split_values:
if sv in str(split_val):
if not finding.muted:
if index not in group_split_seen[group_key][sv]:
group_split_seen[group_key][sv].add(index)
if finding.status == "FAIL":
groups[group_key][sv]["FAIL"] += 1
else:
@@ -364,6 +386,7 @@ def _render_scored(
risk_field = scoring.risk_field
weight_field = scoring.weight_field
groups = {}
group_seen = {}
pass_count = []
fail_count = []
muted_count = []
@@ -388,6 +411,7 @@ def _render_scored(
if group_key not in groups:
groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0}
group_seen[group_key] = set()
score_per_group[group_key] = 0
max_score_per_group[group_key] = 0
counted_per_group[group_key] = []
@@ -398,16 +422,26 @@ def _render_scored(
max_score_per_group[group_key] += risk * weight
counted_per_group[group_key].append(index)
# Overview totals: count each finding once per framework
if finding.muted:
if index not in muted_count:
muted_count.append(index)
groups[group_key]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
elif finding.status == "FAIL":
if index not in fail_count:
fail_count.append(index)
groups[group_key]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
elif finding.status == "PASS":
if index not in pass_count:
pass_count.append(index)
# Per-group counts: count each finding once per group it belongs
# to (a finding can map to several groups via several requirements).
if index not in group_seen[group_key]:
group_seen[group_key].add(index)
if finding.muted:
groups[group_key]["Muted"] += 1
elif finding.status == "FAIL":
groups[group_key]["FAIL"] += 1
elif finding.status == "PASS":
groups[group_key]["PASS"] += 1
if index not in counted_generic and not finding.muted:
@@ -0,0 +1,132 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.asd_essential_eight.asd_essential_eight import (
get_asd_essential_eight_table,
)
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(provider, sections, framework="ASD-Essential-Eight"):
"""Build a per-check compliance covering the given sections."""
return SimpleNamespace(
Framework=framework,
Provider=provider,
Requirements=[
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
for section in sections
],
)
class TestASDEssentialEightTable:
"""Test cases verifying multi-section counting and provider-column attribution for the ASD Essential Eight compliance table."""
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several sections must show FAIL(1) in
every section, not just the first one seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_asd_essential_eight_table(
findings,
bulk_metadata,
"asd_essential_eight_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both IAM and Logging must report FAIL(1); before the fix Logging was
# undercounted because the per-section count was gated by the global
# dedup list.
assert captured.out.count("FAIL(1)") == 2
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several sections must increase the
per-section Muted count in every section, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A real FAIL is needed so the results table is rendered at all.
_make_finding("check_b", "FAIL"),
]
get_asd_essential_eight_table(
findings,
bulk_metadata,
"asd_essential_eight_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# The muted check belongs to both IAM and Logging, so the Muted column
# must read 1 in both rows.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched ASD-Essential-Eight
compliance, never from a different framework that happens to be the last
entry in the check's compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_asd_essential_eight_table(
findings,
bulk_metadata,
"asd_essential_eight_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "leaked_provider" not in captured.out
@@ -0,0 +1,130 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.c5.c5 import get_c5_table
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(provider, sections, framework="C5"):
"""Build a per-check compliance covering the given sections."""
return SimpleNamespace(
Framework=framework,
Provider=provider,
Requirements=[
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
for section in sections
],
)
class TestC5Table:
"""Verify multi-section counting and provider-column attribution for the compliance table."""
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several sections must show FAIL(1) in
every section, not just the first one seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_c5_table(
findings,
bulk_metadata,
"c5_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both IAM and Logging must report FAIL(1); before the fix Logging was
# undercounted because the per-section count was gated by the global
# dedup list.
assert captured.out.count("FAIL(1)") == 2
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several sections must increase the
per-section Muted count in every section, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A real FAIL is needed so the results table is rendered at all.
_make_finding("check_b", "FAIL"),
]
get_c5_table(
findings,
bulk_metadata,
"c5_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# The muted check belongs to both IAM and Logging, so the Muted column
# must read 1 in both rows.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched C5 compliance, never
from a different framework that happens to be the last entry in the
check's compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_c5_table(
findings,
bulk_metadata,
"c5_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "leaked_provider" not in captured.out
@@ -0,0 +1,130 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.ccc.ccc import get_ccc_table
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(provider, sections, framework="CCC"):
"""Build a per-check compliance covering the given sections."""
return SimpleNamespace(
Framework=framework,
Provider=provider,
Requirements=[
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
for section in sections
],
)
class TestCCCTable:
"""Test cases verifying multi-section counting and provider-column attribution for the compliance table."""
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several sections must show FAIL(1) in
every section, not just the first one seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_ccc_table(
findings,
bulk_metadata,
"ccc_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both IAM and Logging must report FAIL(1); before the fix Logging was
# undercounted because the per-section count was gated by the global
# dedup list.
assert captured.out.count("FAIL(1)") == 2
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several sections must increase the
per-section Muted count in every section, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A real FAIL is needed so the results table is rendered at all.
_make_finding("check_b", "FAIL"),
]
get_ccc_table(
findings,
bulk_metadata,
"ccc_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# The muted check belongs to both IAM and Logging, so the Muted column
# must read 1 in both rows.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched CCC compliance, never
from a different framework that happens to be the last entry in the
check's compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_ccc_table(
findings,
bulk_metadata,
"ccc_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "leaked_provider" not in captured.out
@@ -0,0 +1,162 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
def _strip_ansi(text):
return re.sub(r"\x1b\[[0-9;]*m", "", text)
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _attr(section, profile="Level 1"):
return SimpleNamespace(Section=section, Profile=profile)
def _make_compliance(provider, attributes, version="1.4", framework="CIS"):
"""Build a per-check CIS compliance with the given (section, profile) attrs."""
return SimpleNamespace(
Framework=framework,
Version=version,
Provider=provider,
Requirements=[SimpleNamespace(Attributes=attributes)],
)
def _make_compliance_multi_req(provider, attributes, version="1.4", framework="CIS"):
"""Build a per-check CIS compliance where each attr is its own requirement,
simulating a check that appears in several requirements."""
return SimpleNamespace(
Framework=framework,
Version=version,
Provider=provider,
Requirements=[SimpleNamespace(Attributes=[attr]) for attr in attributes],
)
class TestCISTable:
"""Verify multi-section counting and provider-column attribution for the CIS compliance table."""
def test_muted_multi_section_not_undercounted(self, capsys, tmp_path):
"""A single MUTED finding mapped to several sections must increment the
per-section Muted column for every section, not only the first seen.
CIS counts FAIL/PASS through Level 1/Level 2 buckets, so only the Muted
per-section count was affected by the undercount bug.
"""
bulk_metadata = {
# check_a is muted and belongs to two sections at once.
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("1 IAM"), _attr("2 Logging")])
]
),
# A real (non-muted) finding so the table is rendered.
"check_b": SimpleNamespace(
Compliance=[_make_compliance("aws", [_attr("1 IAM")])]
),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
_make_finding("check_b", "PASS"),
]
get_cis_table(
findings,
bulk_metadata,
"cis_1.4_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# Both section rows must carry a Muted count of 1 in their last cell.
# Before the fix only the first section seen got incremented.
muted_one_rows = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_one_rows) == 2
def test_same_section_level_not_double_counted(self, capsys, tmp_path):
"""A single finding whose check maps to several requirements that share
the same section and profile must count once for that section/level,
not once per requirement (FAIL(1), never FAIL(2))."""
bulk_metadata = {
# check_a is a single FAIL mapped to two requirements, both in the
# same section "1 IAM" and the same profile "Level 1".
"check_a": SimpleNamespace(
Compliance=[
_make_compliance_multi_req("aws", [_attr("1 IAM"), _attr("1 IAM")])
]
),
# A second finding in another section so the table renders.
"check_b": SimpleNamespace(
Compliance=[_make_compliance("aws", [_attr("2 Logging")])]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_cis_table(
findings,
bulk_metadata,
"cis_1.4_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# The "1 IAM" row must show FAIL(1) for Level 1, never FAIL(2).
assert "FAIL(1)" in plain
assert "FAIL(2)" not in plain
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched CIS compliance, not
from a different framework that trails it in the compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("1 IAM")]),
_make_compliance(
"gcp", [_attr("Other")], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("1 IAM")]),
_make_compliance(
"gcp", [_attr("Other")], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_cis_table(
findings,
bulk_metadata,
"cis_1.4_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The trailing unrelated framework's provider must not leak in.
assert "gcp" not in captured.out
@@ -0,0 +1,235 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
def _strip_ansi(text):
return re.sub(r"\x1b\[[0-9;]*m", "", text)
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _attr(marco, categoria, tipo="requisito", nivel="alto"):
return SimpleNamespace(Marco=marco, Categoria=categoria, Tipo=tipo, Nivel=nivel)
def _make_compliance(provider, attributes, framework="ENS"):
"""Build a per-check ENS compliance with the given marco/categoria attrs."""
return SimpleNamespace(
Framework=framework,
Provider=provider,
Requirements=[SimpleNamespace(Attributes=attributes)],
)
class TestENSTable:
"""Test cases for ENS compliance table rendering.
Verify multi-marco counting and provider-column attribution for the
compliance table.
"""
def test_no_cumple_marked_in_every_marco(self, capsys, tmp_path):
"""A single failing finding mapped to several marcos must mark every
one of them as NO CUMPLE, not only the first marco seen."""
bulk_metadata = {
# check_a fails and belongs to two distinct marcos/categorias.
"check_a": SimpleNamespace(
Compliance=[
_make_compliance(
"aws",
[
_attr("operacional", "control de acceso"),
_attr("organizativo", "politica de seguridad"),
],
)
]
),
# A passing finding so the overview total reaches 2.
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("operacional", "control de acceso")])
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_ens_table(
findings,
bulk_metadata,
"ens_rd2022_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# Both marco rows the failing finding maps to must read NO CUMPLE.
# Before the fix only the first marco was marked, the second stayed
# CUMPLE. Anchor the assertion to the actual marco rows (not the
# overview header line which also mentions NO CUMPLE).
op_row = [
line
for line in plain.splitlines()
if "operacional/control de acceso" in line
]
org_row = [
line
for line in plain.splitlines()
if "organizativo/politica de seguridad" in line
]
assert len(op_row) == 1 and "NO CUMPLE" in op_row[0]
assert len(org_row) == 1 and "NO CUMPLE" in org_row[0]
def test_recomendacion_does_not_set_no_cumple(self, capsys, tmp_path):
"""A FAIL on a 'recomendacion' attribute must not flip a marco to
NO CUMPLE (this path is intentionally excluded from the fix)."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance(
"aws",
[
_attr(
"operacional", "control de acceso", tipo="recomendacion"
)
],
)
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("organizativo", "politica")])
]
),
# A regular (non-recomendacion) check so the results table renders
# at least one marco row and the assertion below is not vacuous.
"check_c": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("operacional", "continuidad")])
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
_make_finding("check_c", "PASS"),
]
get_ens_table(
findings,
bulk_metadata,
"ens_rd2022_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# The recomendacion FAIL must not appear as a NO CUMPLE marco row in the
# results table (the overview header line is allowed to mention it).
marco_rows = [
line
for line in plain.splitlines()
if "operacional" in line or "organizativo" in line
]
# Guard against a vacuous pass: the table must actually render rows.
assert marco_rows
assert all("NO CUMPLE" not in line for line in marco_rows)
def test_muted_multi_marco_not_undercounted(self, capsys, tmp_path):
"""A single MUTED finding mapped to several marcos must increment the
per-marco Muted column for every marco, not only the first seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance(
"aws",
[
_attr("operacional", "control de acceso"),
_attr("organizativo", "politica de seguridad"),
],
)
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", [_attr("operacional", "control de acceso")])
]
),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
_make_finding("check_b", "FAIL"),
]
get_ens_table(
findings,
bulk_metadata,
"ens_rd2022_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# Both marco rows the muted finding maps to must report a Muted count of
# 1 in their last cell.
muted_one_rows = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_one_rows) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Proveedor column must come from the matched ENS compliance, not
from a different framework that trails it in the compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance(
"aws", [_attr("operacional", "control de acceso")]
),
_make_compliance(
"gcp", [_attr("x", "y")], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance(
"aws", [_attr("operacional", "control de acceso")]
),
_make_compliance(
"gcp", [_attr("x", "y")], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_ens_table(
findings,
bulk_metadata,
"ens_rd2022_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
assert "gcp" not in captured.out
@@ -0,0 +1,137 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.kisa_ismsp.kisa_ismsp import get_kisa_ismsp_table
# The generator matches a compliance when its Framework starts with "KISA" and
# its Version is contained in the compliance_framework argument.
COMPLIANCE_FRAMEWORK = "kisa-isms-p-2023_aws"
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(
provider, sections, framework="KISA-ISMS-P", version="kisa-isms-p-2023"
):
"""Build a per-check compliance covering the given sections."""
return SimpleNamespace(
Framework=framework,
Version=version,
Provider=provider,
Requirements=[
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
for section in sections
],
)
class TestKISAISMSPTable:
"""Verify multi-section counting and provider-column attribution for the KISA ISMS-P compliance table."""
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several sections must show FAIL(1) in
every section, not just the first one seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_kisa_ismsp_table(
findings,
bulk_metadata,
COMPLIANCE_FRAMEWORK,
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both IAM and Logging must report FAIL(1); before the fix Logging was
# undercounted because the per-section count was gated by the global
# dedup list.
assert captured.out.count("FAIL(1)") == 2
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several sections must increase the
per-section Muted count in every section, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A real FAIL is needed so the results table is rendered at all.
_make_finding("check_b", "FAIL"),
]
get_kisa_ismsp_table(
findings,
bulk_metadata,
COMPLIANCE_FRAMEWORK,
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# The muted check belongs to both IAM and Logging, so the Muted column
# must read 1 in both rows.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched KISA compliance, never
from a different framework that happens to be the last entry in the
check's compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_kisa_ismsp_table(
findings,
bulk_metadata,
COMPLIANCE_FRAMEWORK,
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "leaked_provider" not in captured.out
@@ -0,0 +1,140 @@
import re
from types import SimpleNamespace
from prowler.lib.outputs.compliance.mitre_attack.mitre_attack import (
get_mitre_attack_table,
)
# The generator matches a compliance when "MITRE-ATTACK" is in its Framework and
# its Version is contained in the compliance_framework argument.
COMPLIANCE_FRAMEWORK = "mitre_attack_aws"
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(
provider, tactics, framework="MITRE-ATTACK", version="mitre_attack"
):
"""Build a per-check compliance covering the given tactics."""
return SimpleNamespace(
Framework=framework,
Version=version,
Provider=provider,
Requirements=[SimpleNamespace(Tactics=tactics)],
)
class TestMitreAttackTable:
"""Test multi-section counting and provider-column attribution for the compliance table."""
def test_multi_tactic_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several tactics must show FAIL(1) in
every tactic, not just the first one seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["Persistence", "Execution"])]
),
"check_b": SimpleNamespace(
Compliance=[_make_compliance("aws", ["Persistence"])]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_mitre_attack_table(
findings,
bulk_metadata,
COMPLIANCE_FRAMEWORK,
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both Persistence and Execution must report FAIL(1); before the fix
# Execution was undercounted because the per-tactic count was gated by
# the global dedup list.
assert captured.out.count("FAIL(1)") == 2
def test_multi_tactic_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several tactics must increase the
per-tactic Muted count in every tactic, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["Persistence", "Execution"])]
),
"check_b": SimpleNamespace(
Compliance=[_make_compliance("aws", ["Persistence"])]
),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A second finding is needed so the table is rendered at all.
_make_finding("check_b", "FAIL"),
]
get_mitre_attack_table(
findings,
bulk_metadata,
COMPLIANCE_FRAMEWORK,
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# The muted check belongs to both Persistence and Execution, so the
# Muted column must read 1 in both rows.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched MITRE-ATTACK
compliance, never from a different framework that happens to be the last
entry in the check's compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["Persistence"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["Persistence"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_mitre_attack_table(
findings,
bulk_metadata,
COMPLIANCE_FRAMEWORK,
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "leaked_provider" not in captured.out
@@ -0,0 +1,136 @@
from types import SimpleNamespace
from prowler.lib.outputs.compliance.okta_idaas_stig.okta_idaas_stig import (
get_okta_idaas_stig_table,
)
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(provider, sections, framework="Okta-IDaaS-STIG"):
"""Build a per-check compliance covering the given sections."""
return SimpleNamespace(
Framework=framework,
Provider=provider,
Requirements=[
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
for section in sections
],
)
class TestOktaIDaaSSTIGTable:
"""Test cases for Okta IDaaS STIG compliance table rendering."""
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several sections must show FAIL(1) in
every section, not just the first one seen."""
bulk_metadata = {
# check_a belongs to two sections at once.
"check_a": SimpleNamespace(
Compliance=[_make_compliance("okta", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("okta", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_okta_idaas_stig_table(
findings,
bulk_metadata,
"okta_idaas_stig_1r2",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both IAM and Logging must report FAIL(1); before the fix Logging
# was undercounted and rendered as plain PASS.
assert captured.out.count("FAIL(1)") == 2
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several sections must increase the
per-section Muted count in every section, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("okta", ["IAM", "Logging"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("okta", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A real FAIL is needed so the results table is rendered at all.
_make_finding("check_b", "FAIL"),
]
get_okta_idaas_stig_table(
findings,
bulk_metadata,
"okta_idaas_stig_1r2",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# The muted check belongs to both IAM and Logging, so the Muted column
# must read 1 in both rows. Before the fix only the first section seen
# was incremented, leaving the second at 0.
# Strip ANSI color codes before counting the bare values per row.
import re
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# Each section row ends with its Muted value in its own cell; both rows
# must carry a Muted count of 1.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched Okta-IDaaS-STIG
compliance, never from a different framework that happens to be the
last entry in the check's compliance list."""
# check_a maps to Okta-IDaaS-STIG (provider "okta") but its compliance
# list ends with a *different* framework whose provider is "aws". With
# the bug the leaked loop variable made the table render "aws".
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("okta", ["IAM"]),
_make_compliance("aws", ["Other"], framework="OtherFramework"),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("okta", ["IAM"]),
_make_compliance("aws", ["Other"], framework="OtherFramework"),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
get_okta_idaas_stig_table(
findings,
bulk_metadata,
"okta_idaas_stig_1r2",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "okta" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "aws" not in captured.out
@@ -0,0 +1,147 @@
import re
from types import SimpleNamespace
from unittest import mock
from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore import (
get_prowler_threatscore_table,
)
# Patch target for the Compliance.get_bulk lookup used to render pillars without
# findings; the tests don't exercise that path so it returns nothing.
COMPLIANCE_PATH = (
"prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore.Compliance"
)
def _make_finding(check_id, status="PASS", muted=False):
return SimpleNamespace(
check_metadata=SimpleNamespace(CheckID=check_id),
status=status,
muted=muted,
)
def _make_compliance(provider, pillars, framework="ProwlerThreatScore"):
"""Build a per-check compliance covering the given pillars (Section)."""
return SimpleNamespace(
Framework=framework,
Provider=provider,
Requirements=[
SimpleNamespace(
Attributes=[SimpleNamespace(Section=pillar, LevelOfRisk=5, Weight=100)]
)
for pillar in pillars
],
)
class TestProwlerThreatScoreTable:
"""Verify multi-section counting and provider-column attribution for the compliance table."""
def test_multi_pillar_fail_not_undercounted(self, capsys, tmp_path):
"""A single FAIL check mapped to several pillars must show FAIL(1) in
every pillar, not just the first one seen."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Encryption"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
with mock.patch(COMPLIANCE_PATH) as compliance_mock:
compliance_mock.get_bulk.return_value = {}
get_prowler_threatscore_table(
findings,
bulk_metadata,
"prowler_threatscore_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
# Both IAM and Encryption must report FAIL(1); before the fix Encryption
# was undercounted because the per-pillar count was gated by the global
# dedup list.
assert captured.out.count("FAIL(1)") == 2
def test_multi_pillar_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED check mapped to several pillars must increase the
per-pillar Muted count in every pillar, not only the first one."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[_make_compliance("aws", ["IAM", "Encryption"])]
),
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
}
findings = [
_make_finding("check_a", "FAIL", muted=True),
# A real FAIL is needed so the results table is rendered at all.
_make_finding("check_b", "FAIL"),
]
with mock.patch(COMPLIANCE_PATH) as compliance_mock:
compliance_mock.get_bulk.return_value = {}
get_prowler_threatscore_table(
findings,
bulk_metadata,
"prowler_threatscore_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
# The muted check belongs to both IAM and Encryption, so the Muted
# column must read 1 in both rows.
muted_cells = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_cells) == 2
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
"""The Provider column must come from the matched ProwlerThreatScore
compliance, never from a different framework that happens to be the last
entry in the check's compliance list."""
bulk_metadata = {
"check_a": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
"check_b": SimpleNamespace(
Compliance=[
_make_compliance("aws", ["IAM"]),
_make_compliance(
"leaked_provider", ["Other"], framework="OtherFramework"
),
]
),
}
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
with mock.patch(COMPLIANCE_PATH) as compliance_mock:
compliance_mock.get_bulk.return_value = {}
get_prowler_threatscore_table(
findings,
bulk_metadata,
"prowler_threatscore_aws",
"output",
str(tmp_path),
False,
)
captured = capsys.readouterr()
assert "aws" in captured.out
# The provider of the unrelated trailing framework must NOT leak into
# the rendered table.
assert "leaked_provider" not in captured.out
@@ -1,3 +1,4 @@
import re
from types import SimpleNamespace
from unittest.mock import MagicMock
@@ -26,6 +27,10 @@ def _make_finding(check_id, status="PASS", muted=False):
return finding
def _strip_ansi(text):
return re.sub(r"\x1b\[[0-9;]*m", "", text)
def _make_framework(requirements, table_config, provider="AWS"):
return ComplianceFramework(
framework="TestFW",
@@ -39,6 +44,8 @@ def _make_framework(requirements, table_config, provider="AWS"):
class TestBuildRequirementCheckMap:
"""Test cases for building the requirement-to-check map of a framework."""
def test_basic(self):
reqs = [
UniversalComplianceRequirement(
@@ -103,6 +110,8 @@ class TestBuildRequirementCheckMap:
class TestGetGroupKey:
"""Test cases for resolving the group key of a requirement."""
def test_normal_field(self):
req = UniversalComplianceRequirement(
id="1.1",
@@ -124,7 +133,9 @@ class TestGetGroupKey:
class TestGroupedMode:
def test_grouped_rendering(self, capsys):
"""Test cases for grouped-mode universal compliance table rendering."""
def test_grouped_rendering(self, capsys, tmp_path):
reqs = [
UniversalComplianceRequirement(
id="1.1",
@@ -156,7 +167,7 @@ class TestGroupedMode:
bulk_metadata,
"test_fw",
"output",
"/tmp",
str(tmp_path),
False,
framework=fw,
)
@@ -167,9 +178,118 @@ class TestGroupedMode:
assert "PASS" in captured.out
assert "FAIL" in captured.out
def test_grouped_multi_section_no_undercount(self, capsys, tmp_path):
"""A single check mapped to several sections must be counted in
every section it belongs to, not only the first one seen."""
reqs = [
UniversalComplianceRequirement(
id="1.1",
description="test",
attributes={"Section": "IAM"},
checks={"aws": ["check_a", "check_b"]},
),
UniversalComplianceRequirement(
id="2.1",
description="test2",
attributes={"Section": "Logging"},
checks={"aws": ["check_a"]},
),
]
tc = TableConfig(group_by="Section")
fw = _make_framework(reqs, tc)
# check_a (FAIL) belongs to both IAM and Logging sections; check_b
# (PASS, IAM only) is added so the overview total reaches 2 and the
# results table is rendered.
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
str(tmp_path),
False,
framework=fw,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# Both the IAM and Logging rows must report FAIL(1). Before the fix the
# second section seen (Logging) was undercounted to FAIL(0) and rendered
# as PASS. Anchor each occurrence to its own table row so an unrelated
# "FAIL(1)" elsewhere cannot mask an undercount.
iam_row = [
line for line in plain.splitlines() if "IAM" in line and "FAIL(1)" in line
]
logging_row = [
line
for line in plain.splitlines()
if "Logging" in line and "FAIL(1)" in line
]
assert len(iam_row) == 1
assert len(logging_row) == 1
def test_grouped_multi_section_muted_not_undercounted(self, capsys, tmp_path):
"""A single MUTED finding mapped to several groups must be counted in
the per-group Muted column of every group it belongs to."""
reqs = [
UniversalComplianceRequirement(
id="1.1",
description="test",
attributes={"Section": "IAM"},
checks={"aws": ["check_a", "check_b"]},
),
UniversalComplianceRequirement(
id="2.1",
description="test2",
attributes={"Section": "Logging"},
checks={"aws": ["check_a"]},
),
]
tc = TableConfig(group_by="Section")
fw = _make_framework(reqs, tc)
# check_a is MUTED and belongs to both IAM and Logging; check_b is a
# plain FAIL so the overview total reaches 2 and the table is rendered.
findings = [
_make_finding("check_a", "FAIL", muted=True),
_make_finding("check_b", "FAIL"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
str(tmp_path),
False,
framework=fw,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# The muted finding belongs to both sections, so both the IAM row and
# the Logging row must carry a Muted count of 1 in their last cell.
muted_one_rows = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_one_rows) == 2
class TestSplitMode:
def test_split_rendering(self, capsys):
"""Test cases for split-mode universal compliance table rendering."""
def test_split_rendering(self, capsys, tmp_path):
reqs = [
UniversalComplianceRequirement(
id="1.1",
@@ -204,7 +324,7 @@ class TestSplitMode:
bulk_metadata,
"test_fw",
"output",
"/tmp",
str(tmp_path),
False,
framework=fw,
)
@@ -214,9 +334,119 @@ class TestSplitMode:
assert "Level 1" in captured.out
assert "Level 2" in captured.out
def test_split_muted_multi_section_not_undercounted(self, capsys, tmp_path):
"""In split mode a single MUTED finding mapped to several groups must
be counted in the Muted column of every group it belongs to."""
reqs = [
UniversalComplianceRequirement(
id="1.1",
description="test",
attributes={"Section": "Storage", "Profile": "Level 1"},
checks={"aws": ["check_a", "check_b"]},
),
UniversalComplianceRequirement(
id="2.1",
description="test2",
attributes={"Section": "Logging", "Profile": "Level 1"},
checks={"aws": ["check_a"]},
),
]
tc = TableConfig(
group_by="Section",
split_by=SplitByConfig(field="Profile", values=["Level 1", "Level 2"]),
)
fw = _make_framework(reqs, tc)
# check_a is MUTED and belongs to both Storage and Logging; check_b is a
# plain FAIL so the table is rendered.
findings = [
_make_finding("check_a", "FAIL", muted=True),
_make_finding("check_b", "FAIL"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
str(tmp_path),
False,
framework=fw,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# Both section rows must carry a Muted count of 1 (last cell). Before the
# fix only the first group seen incremented Muted, leaving the other 0.
muted_one_rows = re.findall(r"\s*1\s*│\s*$", plain, flags=re.MULTILINE)
assert len(muted_one_rows) == 2
def test_split_same_group_value_not_double_counted(self, capsys, tmp_path):
"""A single finding whose check maps to several requirements that share
the same group and split value must count once for that group/split,
not once per requirement (FAIL(1), never FAIL(2))."""
reqs = [
# check_a appears in two requirements, both Storage / Level 1.
UniversalComplianceRequirement(
id="1.1",
description="test",
attributes={"Section": "Storage", "Profile": "Level 1"},
checks={"aws": ["check_a"]},
),
UniversalComplianceRequirement(
id="1.2",
description="test2",
attributes={"Section": "Storage", "Profile": "Level 1"},
checks={"aws": ["check_a"]},
),
# A second group so the table renders with more than one finding.
UniversalComplianceRequirement(
id="2.1",
description="test3",
attributes={"Section": "Logging", "Profile": "Level 1"},
checks={"aws": ["check_b"]},
),
]
tc = TableConfig(
group_by="Section",
split_by=SplitByConfig(field="Profile", values=["Level 1", "Level 2"]),
)
fw = _make_framework(reqs, tc)
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
str(tmp_path),
False,
framework=fw,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
# The Storage row must show FAIL(1) for Level 1, never FAIL(2).
assert "FAIL(1)" in plain
assert "FAIL(2)" not in plain
class TestScoredMode:
def test_scored_rendering(self, capsys):
"""Test cases for scored-mode universal compliance table rendering."""
def test_scored_rendering(self, capsys, tmp_path):
reqs = [
UniversalComplianceRequirement(
id="1.1",
@@ -251,7 +481,7 @@ class TestScoredMode:
bulk_metadata,
"test_fw",
"output",
"/tmp",
str(tmp_path),
False,
framework=fw,
)
@@ -261,9 +491,68 @@ class TestScoredMode:
assert "Score" in captured.out
assert "Threat Score" in captured.out
def test_scored_multi_section_fail_not_undercounted(self, capsys, tmp_path):
"""In scored mode a single FAIL finding mapped to several groups must
show FAIL(1) in every group it belongs to, not only the first one."""
reqs = [
UniversalComplianceRequirement(
id="1.1",
description="test",
attributes={"Section": "IAM", "LevelOfRisk": 5, "Weight": 100},
checks={"aws": ["check_a", "check_b"]},
),
UniversalComplianceRequirement(
id="2.1",
description="test2",
attributes={"Section": "Logging", "LevelOfRisk": 3, "Weight": 50},
checks={"aws": ["check_a"]},
),
]
tc = TableConfig(
group_by="Section",
scoring=ScoringConfig(risk_field="LevelOfRisk", weight_field="Weight"),
)
fw = _make_framework(reqs, tc)
# check_a (FAIL) belongs to both IAM and Logging; check_b (PASS, IAM
# only) raises the overview total to 2 so the table is rendered.
findings = [
_make_finding("check_a", "FAIL"),
_make_finding("check_b", "PASS"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
str(tmp_path),
False,
framework=fw,
)
captured = capsys.readouterr()
plain = _strip_ansi(captured.out)
iam_row = [
line for line in plain.splitlines() if "IAM" in line and "FAIL(1)" in line
]
logging_row = [
line
for line in plain.splitlines()
if "Logging" in line and "FAIL(1)" in line
]
assert len(iam_row) == 1
assert len(logging_row) == 1
class TestCustomLabels:
def test_ens_spanish_labels(self, capsys):
"""Test cases for custom-label universal compliance table rendering."""
def test_ens_spanish_labels(self, capsys, tmp_path):
reqs = [
UniversalComplianceRequirement(
id="1.1",
@@ -300,7 +589,7 @@ class TestCustomLabels:
bulk_metadata,
"test_fw",
"output",
"/tmp",
str(tmp_path),
False,
framework=fw,
)
@@ -311,7 +600,9 @@ class TestCustomLabels:
class TestMultiProviderDictChecks:
def test_only_aws_checks_matched(self, capsys):
"""Test cases for multi-provider dict checks in the universal table."""
def test_only_aws_checks_matched(self, capsys, tmp_path):
"""With dict checks and provider='aws', only AWS checks match findings."""
reqs = [
UniversalComplianceRequirement(
@@ -352,7 +643,7 @@ class TestMultiProviderDictChecks:
bulk_metadata,
"multi_cloud",
"output",
"/tmp",
str(tmp_path),
False,
framework=fw,
provider="aws",
@@ -366,7 +657,9 @@ class TestMultiProviderDictChecks:
class TestNoTableConfig:
def test_returns_early_without_table_config(self, capsys):
"""Test cases for the universal table when no table config is present."""
def test_returns_early_without_table_config(self, capsys, tmp_path):
fw = ComplianceFramework(
framework="TestFW",
name="Test",
@@ -374,11 +667,11 @@ class TestNoTableConfig:
description="Test",
requirements=[],
)
get_universal_table([], {}, "test", "out", "/tmp", False, framework=fw)
get_universal_table([], {}, "test", "out", str(tmp_path), False, framework=fw)
captured = capsys.readouterr()
assert captured.out == ""
def test_returns_early_without_framework(self, capsys):
get_universal_table([], {}, "test", "out", "/tmp", False, framework=None)
def test_returns_early_without_framework(self, capsys, tmp_path):
get_universal_table([], {}, "test", "out", str(tmp_path), False, framework=None)
captured = capsys.readouterr()
assert captured.out == ""