diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 24fc4d4e49..12a89e61b3 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to the **Prowler SDK** are documented in this file. +## [5.30.3] (Prowler UNRELEASED) + +### 🐞 Fixed + +- CLI compliance summary tables no longer undercount findings mapped to multiple sections nor double-count a single finding mapped to several requirements within the same group/split, and the Provider column no longer leaks a value from another framework [(#11567)](https://github.com/prowler-cloud/prowler/pull/11567) + +--- + ## [5.30.2] (Prowler v5.30.2) ### 🐞 Fixed diff --git a/prowler/lib/outputs/compliance/asd_essential_eight/asd_essential_eight.py b/prowler/lib/outputs/compliance/asd_essential_eight/asd_essential_eight.py index 75481fb6aa..df23aeb1d1 100644 --- a/prowler/lib/outputs/compliance/asd_essential_eight/asd_essential_eight.py +++ b/prowler/lib/outputs/compliance/asd_essential_eight/asd_essential_eight.py @@ -22,11 +22,14 @@ def get_asd_essential_eight_table( pass_count = [] fail_count = [] muted_count = [] + section_seen = {} + provider = "" for index, finding in enumerate(findings): check = bulk_checks_metadata[finding.check_metadata.CheckID] check_compliances = check.Compliance for compliance in check_compliances: if compliance.Framework == "ASD-Essential-Eight": + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: section = attribute.Section @@ -36,21 +39,33 @@ def get_asd_essential_eight_table( "PASS": 0, "Muted": 0, } + section_seen[section] = set() + + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - sections[section]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - sections[section]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-section counts: count each finding once per section + # it belongs to (a finding can map to several sections). + if index not in section_seen[section]: + section_seen[section].add(index) + if finding.muted: + sections[section]["Muted"] += 1 + elif finding.status == "FAIL": + sections[section]["FAIL"] += 1 + elif finding.status == "PASS": sections[section]["PASS"] += 1 sections = dict(sorted(sections.items())) for section in sections: - asd_essential_eight_compliance_table["Provider"].append(compliance.Provider) + asd_essential_eight_compliance_table["Provider"].append(provider) asd_essential_eight_compliance_table["Section"].append(section) if sections[section]["FAIL"] > 0: asd_essential_eight_compliance_table["Status"].append( diff --git a/prowler/lib/outputs/compliance/c5/c5.py b/prowler/lib/outputs/compliance/c5/c5.py index 32eb7e0f5a..b48260b5a2 100644 --- a/prowler/lib/outputs/compliance/c5/c5.py +++ b/prowler/lib/outputs/compliance/c5/c5.py @@ -22,33 +22,47 @@ def get_c5_table( fail_count = [] muted_count = [] sections = {} + section_seen = {} + provider = "" for index, finding in enumerate(findings): check = bulk_checks_metadata[finding.check_metadata.CheckID] check_compliances = check.Compliance for compliance in check_compliances: if compliance.Framework == "C5": + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: section = attribute.Section if section not in sections: sections[section] = {"FAIL": 0, "PASS": 0, "Muted": 0} + section_seen[section] = set() + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - sections[section]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - sections[section]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-section counts: count each finding once per section + # it belongs to (a finding can map to several sections). + if index not in section_seen[section]: + section_seen[section].add(index) + if finding.muted: + sections[section]["Muted"] += 1 + elif finding.status == "FAIL": + sections[section]["FAIL"] += 1 + elif finding.status == "PASS": sections[section]["PASS"] += 1 sections = dict(sorted(sections.items())) for section in sections: - section_table["Provider"].append(compliance.Provider) + section_table["Provider"].append(provider) section_table["Section"].append(section) if sections[section]["FAIL"] > 0: section_table["Status"].append( diff --git a/prowler/lib/outputs/compliance/ccc/ccc.py b/prowler/lib/outputs/compliance/ccc/ccc.py index 99a6c91cd9..d8d76ad2d9 100644 --- a/prowler/lib/outputs/compliance/ccc/ccc.py +++ b/prowler/lib/outputs/compliance/ccc/ccc.py @@ -22,33 +22,47 @@ def get_ccc_table( fail_count = [] muted_count = [] sections = {} + section_seen = {} + provider = "" for index, finding in enumerate(findings): check = bulk_checks_metadata[finding.check_metadata.CheckID] check_compliances = check.Compliance for compliance in check_compliances: if compliance.Framework == "CCC": + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: section = attribute.Section if section not in sections: sections[section] = {"FAIL": 0, "PASS": 0, "Muted": 0} + section_seen[section] = set() + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - sections[section]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - sections[section]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-section counts: count each finding once per section + # it belongs to (a finding can map to several sections). + if index not in section_seen[section]: + section_seen[section].add(index) + if finding.muted: + sections[section]["Muted"] += 1 + elif finding.status == "FAIL": + sections[section]["FAIL"] += 1 + elif finding.status == "PASS": sections[section]["PASS"] += 1 sections = dict(sorted(sections.items())) for section in sections: - section_table["Provider"].append(compliance.Provider) + section_table["Provider"].append(provider) section_table["Section"].append(section) if sections[section]["FAIL"] > 0: section_table["Status"].append( diff --git a/prowler/lib/outputs/compliance/cis/cis.py b/prowler/lib/outputs/compliance/cis/cis.py index 7f161f34c2..4acbd7fe58 100644 --- a/prowler/lib/outputs/compliance/cis/cis.py +++ b/prowler/lib/outputs/compliance/cis/cis.py @@ -13,6 +13,9 @@ def get_cis_table( compliance_overview: bool, ): sections = {} + section_muted_seen = {} + section_split_seen = {} + provider = "" cis_compliance_table = { "Provider": [], "Section": [], @@ -29,6 +32,7 @@ def get_cis_table( for compliance in check_compliances: version_in_name = compliance_framework.split("_")[1] if compliance.Framework == "CIS" and version_in_name in compliance.Version: + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: section = attribute.Section @@ -40,9 +44,19 @@ def get_cis_table( "Level 2": {"FAIL": 0, "PASS": 0}, "Muted": 0, } + section_muted_seen[section] = set() + section_split_seen[section] = { + "Level 1": set(), + "Level 2": set(), + } if finding.muted: + # Overview total: count each finding once per framework if index not in muted_count: muted_count.append(index) + # Per-section Muted: count each finding once per section + # it belongs to (a finding can map to several sections). + if index not in section_muted_seen[section]: + section_muted_seen[section].add(index) sections[section]["Muted"] += 1 else: if finding.status == "FAIL" and index not in fail_count: @@ -50,13 +64,21 @@ def get_cis_table( elif finding.status == "PASS" and index not in pass_count: pass_count.append(index) if "Level 1" in attribute.Profile: - if not finding.muted: + if ( + not finding.muted + and index not in section_split_seen[section]["Level 1"] + ): + section_split_seen[section]["Level 1"].add(index) if finding.status == "FAIL": sections[section]["Level 1"]["FAIL"] += 1 else: sections[section]["Level 1"]["PASS"] += 1 elif "Level 2" in attribute.Profile: - if not finding.muted: + if ( + not finding.muted + and index not in section_split_seen[section]["Level 2"] + ): + section_split_seen[section]["Level 2"].add(index) if finding.status == "FAIL": sections[section]["Level 2"]["FAIL"] += 1 else: @@ -65,7 +87,7 @@ def get_cis_table( # Add results to table sections = dict(sorted(sections.items())) for section in sections: - cis_compliance_table["Provider"].append(compliance.Provider) + cis_compliance_table["Provider"].append(provider) cis_compliance_table["Section"].append(section) if sections[section]["Level 1"]["FAIL"] > 0: cis_compliance_table["Level 1"].append( diff --git a/prowler/lib/outputs/compliance/ens/ens.py b/prowler/lib/outputs/compliance/ens/ens.py index a414a0206d..c39abe0a6a 100644 --- a/prowler/lib/outputs/compliance/ens/ens.py +++ b/prowler/lib/outputs/compliance/ens/ens.py @@ -13,6 +13,8 @@ def get_ens_table( compliance_overview: bool, ): marcos = {} + marco_muted_seen = {} + provider = "" ens_compliance_table = { "Proveedor": [], "Marco/Categoria": [], @@ -31,6 +33,7 @@ def get_ens_table( check_compliances = check.Compliance for compliance in check_compliances: if compliance.Framework == "ENS": + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: marco_categoria = f"{attribute.Marco}/{attribute.Categoria}" @@ -44,17 +47,23 @@ def get_ens_table( "Bajo": 0, "Muted": 0, } + marco_muted_seen[marco_categoria] = set() if finding.muted: + # Overview total: count each finding once per framework if index not in muted_count: muted_count.append(index) + # Per-marco Muted: count each finding once per marco + # it belongs to (a finding can map to several marcos). + if index not in marco_muted_seen[marco_categoria]: + marco_muted_seen[marco_categoria].add(index) marcos[marco_categoria]["Muted"] += 1 else: if finding.status == "FAIL": - if ( - attribute.Tipo != "recomendacion" - and index not in fail_count - ): - fail_count.append(index) + if attribute.Tipo != "recomendacion": + if index not in fail_count: + fail_count.append(index) + # Mark every marco the finding belongs to as + # NO CUMPLE, not just the first one seen. marcos[marco_categoria][ "Estado" ] = f"{Fore.RED}NO CUMPLE{Style.RESET_ALL}" @@ -71,7 +80,7 @@ def get_ens_table( # Add results to table for marco in sorted(marcos): - ens_compliance_table["Proveedor"].append(compliance.Provider) + ens_compliance_table["Proveedor"].append(provider) ens_compliance_table["Marco/Categoria"].append(marco) ens_compliance_table["Estado"].append(marcos[marco]["Estado"]) ens_compliance_table["Opcional"].append( diff --git a/prowler/lib/outputs/compliance/kisa_ismsp/kisa_ismsp.py b/prowler/lib/outputs/compliance/kisa_ismsp/kisa_ismsp.py index 93b925a5ff..e7c00b188d 100644 --- a/prowler/lib/outputs/compliance/kisa_ismsp/kisa_ismsp.py +++ b/prowler/lib/outputs/compliance/kisa_ismsp/kisa_ismsp.py @@ -13,7 +13,9 @@ def get_kisa_ismsp_table( compliance_overview: bool, ): sections = {} + section_seen = {} sections_status = {} + provider = "" kisa_ismsp_compliance_table = { "Provider": [], "Section": [], @@ -31,6 +33,7 @@ def get_kisa_ismsp_table( compliance.Framework.startswith("KISA") and compliance.Version in compliance_framework ): + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: section = attribute.Section @@ -43,16 +46,28 @@ def get_kisa_ismsp_table( }, "Muted": 0, } + section_seen[section] = set() + + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - sections[section]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - sections[section]["Status"]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-section counts: count each finding once per section + # it belongs to (a finding can map to several sections). + if index not in section_seen[section]: + section_seen[section].add(index) + if finding.muted: + sections[section]["Muted"] += 1 + elif finding.status == "FAIL": + sections[section]["Status"]["FAIL"] += 1 + elif finding.status == "PASS": sections[section]["Status"]["PASS"] += 1 # Add results to table @@ -70,7 +85,7 @@ def get_kisa_ismsp_table( else: sections_status[section] = f"{Fore.GREEN}PASS{Style.RESET_ALL}" for section in sections: - kisa_ismsp_compliance_table["Provider"].append(compliance.Provider) + kisa_ismsp_compliance_table["Provider"].append(provider) kisa_ismsp_compliance_table["Section"].append(section) kisa_ismsp_compliance_table["Status"].append(sections_status[section]) kisa_ismsp_compliance_table["Muted"].append( diff --git a/prowler/lib/outputs/compliance/mitre_attack/mitre_attack.py b/prowler/lib/outputs/compliance/mitre_attack/mitre_attack.py index bab3e4e31a..7492624aff 100644 --- a/prowler/lib/outputs/compliance/mitre_attack/mitre_attack.py +++ b/prowler/lib/outputs/compliance/mitre_attack/mitre_attack.py @@ -13,6 +13,8 @@ def get_mitre_attack_table( compliance_overview: bool, ): tactics = {} + tactic_seen = {} + provider = "" mitre_compliance_table = { "Provider": [], "Tactic": [], @@ -30,27 +32,38 @@ def get_mitre_attack_table( "MITRE-ATTACK" in compliance.Framework and compliance.Version in compliance_framework ): + provider = compliance.Provider for requirement in compliance.Requirements: for tactic in requirement.Tactics: if tactic not in tactics: tactics[tactic] = {"FAIL": 0, "PASS": 0, "Muted": 0} + tactic_seen[tactic] = set() + + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) + elif finding.status == "FAIL": + if index not in fail_count: + fail_count.append(index) + elif finding.status == "PASS": + if index not in pass_count: + pass_count.append(index) + + # Per-tactic counts: count each finding once per tactic + # it belongs to (a finding can map to several tactics). + if index not in tactic_seen[tactic]: + tactic_seen[tactic].add(index) + if finding.muted: tactics[tactic]["Muted"] += 1 - else: - if finding.status == "FAIL": - if index not in fail_count: - fail_count.append(index) - tactics[tactic]["FAIL"] += 1 + elif finding.status == "FAIL": + tactics[tactic]["FAIL"] += 1 elif finding.status == "PASS": - if index not in pass_count: - pass_count.append(index) - tactics[tactic]["PASS"] += 1 + tactics[tactic]["PASS"] += 1 # Add results to table tactics = dict(sorted(tactics.items())) for tactic in tactics: - mitre_compliance_table["Provider"].append(compliance.Provider) + mitre_compliance_table["Provider"].append(provider) mitre_compliance_table["Tactic"].append(tactic) if tactics[tactic]["FAIL"] > 0: mitre_compliance_table["Status"].append( diff --git a/prowler/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig.py b/prowler/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig.py index 5c76055a06..1febe02f60 100644 --- a/prowler/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig.py +++ b/prowler/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig.py @@ -22,33 +22,47 @@ def get_okta_idaas_stig_table( fail_count = [] muted_count = [] sections = {} + section_seen = {} + provider = "" for index, finding in enumerate(findings): check = bulk_checks_metadata[finding.check_metadata.CheckID] check_compliances = check.Compliance for compliance in check_compliances: if compliance.Framework == "Okta-IDaaS-STIG": + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: section = attribute.Section if section not in sections: sections[section] = {"FAIL": 0, "PASS": 0, "Muted": 0} + section_seen[section] = set() + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - sections[section]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - sections[section]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-section counts: count each finding once per section + # it belongs to (a finding can map to several sections). + if index not in section_seen[section]: + section_seen[section].add(index) + if finding.muted: + sections[section]["Muted"] += 1 + elif finding.status == "FAIL": + sections[section]["FAIL"] += 1 + elif finding.status == "PASS": sections[section]["PASS"] += 1 sections = dict(sorted(sections.items())) for section in sections: - section_table["Provider"].append(compliance.Provider) + section_table["Provider"].append(provider) section_table["Section"].append(section) if sections[section]["FAIL"] > 0: section_table["Status"].append( diff --git a/prowler/lib/outputs/compliance/prowler_threatscore/prowler_threatscore.py b/prowler/lib/outputs/compliance/prowler_threatscore/prowler_threatscore.py index cfcd4a006e..b17307f04a 100644 --- a/prowler/lib/outputs/compliance/prowler_threatscore/prowler_threatscore.py +++ b/prowler/lib/outputs/compliance/prowler_threatscore/prowler_threatscore.py @@ -24,6 +24,8 @@ def get_prowler_threatscore_table( fail_count = [] muted_count = [] pillars = {} + pillar_seen = {} + provider = "" generic_score = 0 max_generic_score = 0 counted_findings_generic = [] @@ -35,6 +37,7 @@ def get_prowler_threatscore_table( check_compliances = check.Compliance for compliance in check_compliances: if compliance.Framework == "ProwlerThreatScore": + provider = compliance.Provider for requirement in compliance.Requirements: for attribute in requirement.Attributes: pillar = attribute.Section @@ -65,17 +68,28 @@ def get_prowler_threatscore_table( if pillar not in pillars: pillars[pillar] = {"FAIL": 0, "PASS": 0, "Muted": 0} + pillar_seen[pillar] = set() + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - pillars[pillar]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - pillars[pillar]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-pillar counts: count each finding once per pillar + # it belongs to (a finding can map to several pillars). + if index not in pillar_seen[pillar]: + pillar_seen[pillar].add(index) + if finding.muted: + pillars[pillar]["Muted"] += 1 + elif finding.status == "FAIL": + pillars[pillar]["FAIL"] += 1 + elif finding.status == "PASS": pillars[pillar]["PASS"] += 1 # Generic score @@ -90,18 +104,21 @@ def get_prowler_threatscore_table( counted_findings_generic.append(index) no_findings_pillars = [] - bulk_compliance = Compliance.get_bulk(provider=compliance.Provider.lower()).get( - compliance_framework + bulk_compliance = ( + Compliance.get_bulk(provider=provider.lower()).get(compliance_framework) + if provider + else None ) - for requirement in bulk_compliance.Requirements: - for attribute in requirement.Attributes: - pillar = attribute.Section - if pillar not in pillars.keys() and pillar not in no_findings_pillars: - no_findings_pillars.append(pillar) + if bulk_compliance: + for requirement in bulk_compliance.Requirements: + for attribute in requirement.Attributes: + pillar = attribute.Section + if pillar not in pillars.keys() and pillar not in no_findings_pillars: + no_findings_pillars.append(pillar) pillars = dict(sorted(pillars.items())) for pillar in pillars: - pillar_table["Provider"].append(compliance.Provider) + pillar_table["Provider"].append(provider) pillar_table["Pillar"].append(pillar) if max_score_per_pillar[pillar] == 0: pillar_score = 100.0 @@ -127,7 +144,7 @@ def get_prowler_threatscore_table( ) for pillar in no_findings_pillars: - pillar_table["Provider"].append(compliance.Provider) + pillar_table["Provider"].append(provider) pillar_table["Pillar"].append(pillar) pillar_table["Score"].append(f"{Style.BRIGHT}{Fore.GREEN}100%{Style.RESET_ALL}") pillar_table["Status"].append(f"{Fore.GREEN}PASS{Style.RESET_ALL}") diff --git a/prowler/lib/outputs/compliance/universal/universal_table.py b/prowler/lib/outputs/compliance/universal/universal_table.py index e838c5e9cf..e54ad5155c 100644 --- a/prowler/lib/outputs/compliance/universal/universal_table.py +++ b/prowler/lib/outputs/compliance/universal/universal_table.py @@ -163,6 +163,7 @@ def _render_grouped( """Grouped mode: one row per group with pass/fail counts.""" check_map = _build_requirement_check_map(framework, provider) groups = {} + group_seen = {} pass_count = [] fail_count = [] muted_count = [] @@ -176,17 +177,28 @@ def _render_grouped( for group_key in _get_group_key(req, group_by): if group_key not in groups: groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0} + group_seen[group_key] = set() + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - groups[group_key]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - groups[group_key]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-group counts: count each finding once per group it belongs + # to (a finding can map to several groups via several requirements). + if index not in group_seen[group_key]: + group_seen[group_key].add(index) + if finding.muted: + groups[group_key]["Muted"] += 1 + elif finding.status == "FAIL": + groups[group_key]["FAIL"] += 1 + elif finding.status == "PASS": groups[group_key]["PASS"] += 1 if not _print_overview( @@ -258,6 +270,8 @@ def _render_split( split_field = split_by.field split_values = split_by.values groups = {} + group_muted_seen = {} + group_split_seen = {} pass_count = [] fail_count = [] muted_count = [] @@ -274,12 +288,19 @@ def _render_split( sv: {"FAIL": 0, "PASS": 0} for sv in split_values } groups[group_key]["Muted"] = 0 + group_muted_seen[group_key] = set() + group_split_seen[group_key] = {sv: set() for sv in split_values} split_val = req.attributes.get(split_field, "") if finding.muted: + # Overview total: count each finding once per framework if index not in muted_count: muted_count.append(index) + # Per-group Muted: count each finding once per group it + # belongs to (a finding can map to several groups). + if index not in group_muted_seen[group_key]: + group_muted_seen[group_key].add(index) groups[group_key]["Muted"] += 1 else: if finding.status == "FAIL" and index not in fail_count: @@ -289,7 +310,8 @@ def _render_split( for sv in split_values: if sv in str(split_val): - if not finding.muted: + if index not in group_split_seen[group_key][sv]: + group_split_seen[group_key][sv].add(index) if finding.status == "FAIL": groups[group_key][sv]["FAIL"] += 1 else: @@ -364,6 +386,7 @@ def _render_scored( risk_field = scoring.risk_field weight_field = scoring.weight_field groups = {} + group_seen = {} pass_count = [] fail_count = [] muted_count = [] @@ -388,6 +411,7 @@ def _render_scored( if group_key not in groups: groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0} + group_seen[group_key] = set() score_per_group[group_key] = 0 max_score_per_group[group_key] = 0 counted_per_group[group_key] = [] @@ -398,16 +422,26 @@ def _render_scored( max_score_per_group[group_key] += risk * weight counted_per_group[group_key].append(index) + # Overview totals: count each finding once per framework if finding.muted: if index not in muted_count: muted_count.append(index) - groups[group_key]["Muted"] += 1 - else: - if finding.status == "FAIL" and index not in fail_count: + elif finding.status == "FAIL": + if index not in fail_count: fail_count.append(index) - groups[group_key]["FAIL"] += 1 - elif finding.status == "PASS" and index not in pass_count: + elif finding.status == "PASS": + if index not in pass_count: pass_count.append(index) + + # Per-group counts: count each finding once per group it belongs + # to (a finding can map to several groups via several requirements). + if index not in group_seen[group_key]: + group_seen[group_key].add(index) + if finding.muted: + groups[group_key]["Muted"] += 1 + elif finding.status == "FAIL": + groups[group_key]["FAIL"] += 1 + elif finding.status == "PASS": groups[group_key]["PASS"] += 1 if index not in counted_generic and not finding.muted: diff --git a/tests/lib/outputs/compliance/asd_essential_eight/asd_essential_eight_table_test.py b/tests/lib/outputs/compliance/asd_essential_eight/asd_essential_eight_table_test.py new file mode 100644 index 0000000000..6382dc1d2c --- /dev/null +++ b/tests/lib/outputs/compliance/asd_essential_eight/asd_essential_eight_table_test.py @@ -0,0 +1,132 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.asd_essential_eight.asd_essential_eight import ( + get_asd_essential_eight_table, +) + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance(provider, sections, framework="ASD-Essential-Eight"): + """Build a per-check compliance covering the given sections.""" + return SimpleNamespace( + Framework=framework, + Provider=provider, + Requirements=[ + SimpleNamespace(Attributes=[SimpleNamespace(Section=section)]) + for section in sections + ], + ) + + +class TestASDEssentialEightTable: + """Test cases verifying multi-section counting and provider-column attribution for the ASD Essential Eight compliance table.""" + + def test_multi_section_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several sections must show FAIL(1) in + every section, not just the first one seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_asd_essential_eight_table( + findings, + bulk_metadata, + "asd_essential_eight_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both IAM and Logging must report FAIL(1); before the fix Logging was + # undercounted because the per-section count was gated by the global + # dedup list. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_section_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several sections must increase the + per-section Muted count in every section, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A real FAIL is needed so the results table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + get_asd_essential_eight_table( + findings, + bulk_metadata, + "asd_essential_eight_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # The muted check belongs to both IAM and Logging, so the Muted column + # must read 1 in both rows. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched ASD-Essential-Eight + compliance, never from a different framework that happens to be the last + entry in the check's compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_asd_essential_eight_table( + findings, + bulk_metadata, + "asd_essential_eight_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "leaked_provider" not in captured.out diff --git a/tests/lib/outputs/compliance/c5/__init__.py b/tests/lib/outputs/compliance/c5/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/outputs/compliance/c5/c5_table_test.py b/tests/lib/outputs/compliance/c5/c5_table_test.py new file mode 100644 index 0000000000..c423f5af0b --- /dev/null +++ b/tests/lib/outputs/compliance/c5/c5_table_test.py @@ -0,0 +1,130 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.c5.c5 import get_c5_table + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance(provider, sections, framework="C5"): + """Build a per-check compliance covering the given sections.""" + return SimpleNamespace( + Framework=framework, + Provider=provider, + Requirements=[ + SimpleNamespace(Attributes=[SimpleNamespace(Section=section)]) + for section in sections + ], + ) + + +class TestC5Table: + """Verify multi-section counting and provider-column attribution for the compliance table.""" + + def test_multi_section_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several sections must show FAIL(1) in + every section, not just the first one seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_c5_table( + findings, + bulk_metadata, + "c5_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both IAM and Logging must report FAIL(1); before the fix Logging was + # undercounted because the per-section count was gated by the global + # dedup list. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_section_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several sections must increase the + per-section Muted count in every section, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A real FAIL is needed so the results table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + get_c5_table( + findings, + bulk_metadata, + "c5_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # The muted check belongs to both IAM and Logging, so the Muted column + # must read 1 in both rows. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched C5 compliance, never + from a different framework that happens to be the last entry in the + check's compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_c5_table( + findings, + bulk_metadata, + "c5_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "leaked_provider" not in captured.out diff --git a/tests/lib/outputs/compliance/ccc/ccc_table_test.py b/tests/lib/outputs/compliance/ccc/ccc_table_test.py new file mode 100644 index 0000000000..f647aff3d0 --- /dev/null +++ b/tests/lib/outputs/compliance/ccc/ccc_table_test.py @@ -0,0 +1,130 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.ccc.ccc import get_ccc_table + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance(provider, sections, framework="CCC"): + """Build a per-check compliance covering the given sections.""" + return SimpleNamespace( + Framework=framework, + Provider=provider, + Requirements=[ + SimpleNamespace(Attributes=[SimpleNamespace(Section=section)]) + for section in sections + ], + ) + + +class TestCCCTable: + """Test cases verifying multi-section counting and provider-column attribution for the compliance table.""" + + def test_multi_section_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several sections must show FAIL(1) in + every section, not just the first one seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_ccc_table( + findings, + bulk_metadata, + "ccc_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both IAM and Logging must report FAIL(1); before the fix Logging was + # undercounted because the per-section count was gated by the global + # dedup list. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_section_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several sections must increase the + per-section Muted count in every section, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A real FAIL is needed so the results table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + get_ccc_table( + findings, + bulk_metadata, + "ccc_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # The muted check belongs to both IAM and Logging, so the Muted column + # must read 1 in both rows. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched CCC compliance, never + from a different framework that happens to be the last entry in the + check's compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_ccc_table( + findings, + bulk_metadata, + "ccc_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "leaked_provider" not in captured.out diff --git a/tests/lib/outputs/compliance/cis/cis_table_test.py b/tests/lib/outputs/compliance/cis/cis_table_test.py new file mode 100644 index 0000000000..c47e1387a0 --- /dev/null +++ b/tests/lib/outputs/compliance/cis/cis_table_test.py @@ -0,0 +1,162 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.cis.cis import get_cis_table + + +def _strip_ansi(text): + return re.sub(r"\x1b\[[0-9;]*m", "", text) + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _attr(section, profile="Level 1"): + return SimpleNamespace(Section=section, Profile=profile) + + +def _make_compliance(provider, attributes, version="1.4", framework="CIS"): + """Build a per-check CIS compliance with the given (section, profile) attrs.""" + return SimpleNamespace( + Framework=framework, + Version=version, + Provider=provider, + Requirements=[SimpleNamespace(Attributes=attributes)], + ) + + +def _make_compliance_multi_req(provider, attributes, version="1.4", framework="CIS"): + """Build a per-check CIS compliance where each attr is its own requirement, + simulating a check that appears in several requirements.""" + return SimpleNamespace( + Framework=framework, + Version=version, + Provider=provider, + Requirements=[SimpleNamespace(Attributes=[attr]) for attr in attributes], + ) + + +class TestCISTable: + """Verify multi-section counting and provider-column attribution for the CIS compliance table.""" + + def test_muted_multi_section_not_undercounted(self, capsys, tmp_path): + """A single MUTED finding mapped to several sections must increment the + per-section Muted column for every section, not only the first seen. + + CIS counts FAIL/PASS through Level 1/Level 2 buckets, so only the Muted + per-section count was affected by the undercount bug. + """ + bulk_metadata = { + # check_a is muted and belongs to two sections at once. + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("1 IAM"), _attr("2 Logging")]) + ] + ), + # A real (non-muted) finding so the table is rendered. + "check_b": SimpleNamespace( + Compliance=[_make_compliance("aws", [_attr("1 IAM")])] + ), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + _make_finding("check_b", "PASS"), + ] + + get_cis_table( + findings, + bulk_metadata, + "cis_1.4_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # Both section rows must carry a Muted count of 1 in their last cell. + # Before the fix only the first section seen got incremented. + muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_one_rows) == 2 + + def test_same_section_level_not_double_counted(self, capsys, tmp_path): + """A single finding whose check maps to several requirements that share + the same section and profile must count once for that section/level, + not once per requirement (FAIL(1), never FAIL(2)).""" + bulk_metadata = { + # check_a is a single FAIL mapped to two requirements, both in the + # same section "1 IAM" and the same profile "Level 1". + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance_multi_req("aws", [_attr("1 IAM"), _attr("1 IAM")]) + ] + ), + # A second finding in another section so the table renders. + "check_b": SimpleNamespace( + Compliance=[_make_compliance("aws", [_attr("2 Logging")])] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_cis_table( + findings, + bulk_metadata, + "cis_1.4_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # The "1 IAM" row must show FAIL(1) for Level 1, never FAIL(2). + assert "FAIL(1)" in plain + assert "FAIL(2)" not in plain + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched CIS compliance, not + from a different framework that trails it in the compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("1 IAM")]), + _make_compliance( + "gcp", [_attr("Other")], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("1 IAM")]), + _make_compliance( + "gcp", [_attr("Other")], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_cis_table( + findings, + bulk_metadata, + "cis_1.4_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The trailing unrelated framework's provider must not leak in. + assert "gcp" not in captured.out diff --git a/tests/lib/outputs/compliance/ens/ens_table_test.py b/tests/lib/outputs/compliance/ens/ens_table_test.py new file mode 100644 index 0000000000..2373885dbc --- /dev/null +++ b/tests/lib/outputs/compliance/ens/ens_table_test.py @@ -0,0 +1,235 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.ens.ens import get_ens_table + + +def _strip_ansi(text): + return re.sub(r"\x1b\[[0-9;]*m", "", text) + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _attr(marco, categoria, tipo="requisito", nivel="alto"): + return SimpleNamespace(Marco=marco, Categoria=categoria, Tipo=tipo, Nivel=nivel) + + +def _make_compliance(provider, attributes, framework="ENS"): + """Build a per-check ENS compliance with the given marco/categoria attrs.""" + return SimpleNamespace( + Framework=framework, + Provider=provider, + Requirements=[SimpleNamespace(Attributes=attributes)], + ) + + +class TestENSTable: + """Test cases for ENS compliance table rendering. + + Verify multi-marco counting and provider-column attribution for the + compliance table. + """ + + def test_no_cumple_marked_in_every_marco(self, capsys, tmp_path): + """A single failing finding mapped to several marcos must mark every + one of them as NO CUMPLE, not only the first marco seen.""" + bulk_metadata = { + # check_a fails and belongs to two distinct marcos/categorias. + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance( + "aws", + [ + _attr("operacional", "control de acceso"), + _attr("organizativo", "politica de seguridad"), + ], + ) + ] + ), + # A passing finding so the overview total reaches 2. + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("operacional", "control de acceso")]) + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_ens_table( + findings, + bulk_metadata, + "ens_rd2022_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # Both marco rows the failing finding maps to must read NO CUMPLE. + # Before the fix only the first marco was marked, the second stayed + # CUMPLE. Anchor the assertion to the actual marco rows (not the + # overview header line which also mentions NO CUMPLE). + op_row = [ + line + for line in plain.splitlines() + if "operacional/control de acceso" in line + ] + org_row = [ + line + for line in plain.splitlines() + if "organizativo/politica de seguridad" in line + ] + assert len(op_row) == 1 and "NO CUMPLE" in op_row[0] + assert len(org_row) == 1 and "NO CUMPLE" in org_row[0] + + def test_recomendacion_does_not_set_no_cumple(self, capsys, tmp_path): + """A FAIL on a 'recomendacion' attribute must not flip a marco to + NO CUMPLE (this path is intentionally excluded from the fix).""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance( + "aws", + [ + _attr( + "operacional", "control de acceso", tipo="recomendacion" + ) + ], + ) + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("organizativo", "politica")]) + ] + ), + # A regular (non-recomendacion) check so the results table renders + # at least one marco row and the assertion below is not vacuous. + "check_c": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("operacional", "continuidad")]) + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + _make_finding("check_c", "PASS"), + ] + + get_ens_table( + findings, + bulk_metadata, + "ens_rd2022_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # The recomendacion FAIL must not appear as a NO CUMPLE marco row in the + # results table (the overview header line is allowed to mention it). + marco_rows = [ + line + for line in plain.splitlines() + if "operacional" in line or "organizativo" in line + ] + # Guard against a vacuous pass: the table must actually render rows. + assert marco_rows + assert all("NO CUMPLE" not in line for line in marco_rows) + + def test_muted_multi_marco_not_undercounted(self, capsys, tmp_path): + """A single MUTED finding mapped to several marcos must increment the + per-marco Muted column for every marco, not only the first seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance( + "aws", + [ + _attr("operacional", "control de acceso"), + _attr("organizativo", "politica de seguridad"), + ], + ) + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", [_attr("operacional", "control de acceso")]) + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + _make_finding("check_b", "FAIL"), + ] + + get_ens_table( + findings, + bulk_metadata, + "ens_rd2022_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # Both marco rows the muted finding maps to must report a Muted count of + # 1 in their last cell. + muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_one_rows) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Proveedor column must come from the matched ENS compliance, not + from a different framework that trails it in the compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance( + "aws", [_attr("operacional", "control de acceso")] + ), + _make_compliance( + "gcp", [_attr("x", "y")], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance( + "aws", [_attr("operacional", "control de acceso")] + ), + _make_compliance( + "gcp", [_attr("x", "y")], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_ens_table( + findings, + bulk_metadata, + "ens_rd2022_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + assert "gcp" not in captured.out diff --git a/tests/lib/outputs/compliance/kisa_ismsp/__init__.py b/tests/lib/outputs/compliance/kisa_ismsp/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/outputs/compliance/kisa_ismsp/kisa_ismsp_table_test.py b/tests/lib/outputs/compliance/kisa_ismsp/kisa_ismsp_table_test.py new file mode 100644 index 0000000000..be0fe25ad5 --- /dev/null +++ b/tests/lib/outputs/compliance/kisa_ismsp/kisa_ismsp_table_test.py @@ -0,0 +1,137 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.kisa_ismsp.kisa_ismsp import get_kisa_ismsp_table + +# The generator matches a compliance when its Framework starts with "KISA" and +# its Version is contained in the compliance_framework argument. +COMPLIANCE_FRAMEWORK = "kisa-isms-p-2023_aws" + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance( + provider, sections, framework="KISA-ISMS-P", version="kisa-isms-p-2023" +): + """Build a per-check compliance covering the given sections.""" + return SimpleNamespace( + Framework=framework, + Version=version, + Provider=provider, + Requirements=[ + SimpleNamespace(Attributes=[SimpleNamespace(Section=section)]) + for section in sections + ], + ) + + +class TestKISAISMSPTable: + """Verify multi-section counting and provider-column attribution for the KISA ISMS-P compliance table.""" + + def test_multi_section_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several sections must show FAIL(1) in + every section, not just the first one seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_kisa_ismsp_table( + findings, + bulk_metadata, + COMPLIANCE_FRAMEWORK, + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both IAM and Logging must report FAIL(1); before the fix Logging was + # undercounted because the per-section count was gated by the global + # dedup list. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_section_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several sections must increase the + per-section Muted count in every section, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A real FAIL is needed so the results table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + get_kisa_ismsp_table( + findings, + bulk_metadata, + COMPLIANCE_FRAMEWORK, + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # The muted check belongs to both IAM and Logging, so the Muted column + # must read 1 in both rows. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched KISA compliance, never + from a different framework that happens to be the last entry in the + check's compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_kisa_ismsp_table( + findings, + bulk_metadata, + COMPLIANCE_FRAMEWORK, + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "leaked_provider" not in captured.out diff --git a/tests/lib/outputs/compliance/mitre_attack/__init__.py b/tests/lib/outputs/compliance/mitre_attack/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/outputs/compliance/mitre_attack/mitre_attack_table_test.py b/tests/lib/outputs/compliance/mitre_attack/mitre_attack_table_test.py new file mode 100644 index 0000000000..3bd69b44e8 --- /dev/null +++ b/tests/lib/outputs/compliance/mitre_attack/mitre_attack_table_test.py @@ -0,0 +1,140 @@ +import re +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.mitre_attack.mitre_attack import ( + get_mitre_attack_table, +) + +# The generator matches a compliance when "MITRE-ATTACK" is in its Framework and +# its Version is contained in the compliance_framework argument. +COMPLIANCE_FRAMEWORK = "mitre_attack_aws" + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance( + provider, tactics, framework="MITRE-ATTACK", version="mitre_attack" +): + """Build a per-check compliance covering the given tactics.""" + return SimpleNamespace( + Framework=framework, + Version=version, + Provider=provider, + Requirements=[SimpleNamespace(Tactics=tactics)], + ) + + +class TestMitreAttackTable: + """Test multi-section counting and provider-column attribution for the compliance table.""" + + def test_multi_tactic_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several tactics must show FAIL(1) in + every tactic, not just the first one seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["Persistence", "Execution"])] + ), + "check_b": SimpleNamespace( + Compliance=[_make_compliance("aws", ["Persistence"])] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_mitre_attack_table( + findings, + bulk_metadata, + COMPLIANCE_FRAMEWORK, + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both Persistence and Execution must report FAIL(1); before the fix + # Execution was undercounted because the per-tactic count was gated by + # the global dedup list. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_tactic_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several tactics must increase the + per-tactic Muted count in every tactic, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["Persistence", "Execution"])] + ), + "check_b": SimpleNamespace( + Compliance=[_make_compliance("aws", ["Persistence"])] + ), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A second finding is needed so the table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + get_mitre_attack_table( + findings, + bulk_metadata, + COMPLIANCE_FRAMEWORK, + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # The muted check belongs to both Persistence and Execution, so the + # Muted column must read 1 in both rows. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched MITRE-ATTACK + compliance, never from a different framework that happens to be the last + entry in the check's compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["Persistence"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["Persistence"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_mitre_attack_table( + findings, + bulk_metadata, + COMPLIANCE_FRAMEWORK, + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "leaked_provider" not in captured.out diff --git a/tests/lib/outputs/compliance/okta_idaas_stig/__init__.py b/tests/lib/outputs/compliance/okta_idaas_stig/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig_table_test.py b/tests/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig_table_test.py new file mode 100644 index 0000000000..3017ea4f92 --- /dev/null +++ b/tests/lib/outputs/compliance/okta_idaas_stig/okta_idaas_stig_table_test.py @@ -0,0 +1,136 @@ +from types import SimpleNamespace + +from prowler.lib.outputs.compliance.okta_idaas_stig.okta_idaas_stig import ( + get_okta_idaas_stig_table, +) + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance(provider, sections, framework="Okta-IDaaS-STIG"): + """Build a per-check compliance covering the given sections.""" + return SimpleNamespace( + Framework=framework, + Provider=provider, + Requirements=[ + SimpleNamespace(Attributes=[SimpleNamespace(Section=section)]) + for section in sections + ], + ) + + +class TestOktaIDaaSSTIGTable: + """Test cases for Okta IDaaS STIG compliance table rendering.""" + + def test_multi_section_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several sections must show FAIL(1) in + every section, not just the first one seen.""" + bulk_metadata = { + # check_a belongs to two sections at once. + "check_a": SimpleNamespace( + Compliance=[_make_compliance("okta", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("okta", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_okta_idaas_stig_table( + findings, + bulk_metadata, + "okta_idaas_stig_1r2", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both IAM and Logging must report FAIL(1); before the fix Logging + # was undercounted and rendered as plain PASS. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_section_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several sections must increase the + per-section Muted count in every section, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("okta", ["IAM", "Logging"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("okta", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A real FAIL is needed so the results table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + get_okta_idaas_stig_table( + findings, + bulk_metadata, + "okta_idaas_stig_1r2", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # The muted check belongs to both IAM and Logging, so the Muted column + # must read 1 in both rows. Before the fix only the first section seen + # was incremented, leaving the second at 0. + # Strip ANSI color codes before counting the bare values per row. + import re + + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # Each section row ends with its Muted value in its own cell; both rows + # must carry a Muted count of 1. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched Okta-IDaaS-STIG + compliance, never from a different framework that happens to be the + last entry in the check's compliance list.""" + # check_a maps to Okta-IDaaS-STIG (provider "okta") but its compliance + # list ends with a *different* framework whose provider is "aws". With + # the bug the leaked loop variable made the table render "aws". + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("okta", ["IAM"]), + _make_compliance("aws", ["Other"], framework="OtherFramework"), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("okta", ["IAM"]), + _make_compliance("aws", ["Other"], framework="OtherFramework"), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + get_okta_idaas_stig_table( + findings, + bulk_metadata, + "okta_idaas_stig_1r2", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "okta" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "aws" not in captured.out diff --git a/tests/lib/outputs/compliance/prowler_threatscore/__init__.py b/tests/lib/outputs/compliance/prowler_threatscore/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/outputs/compliance/prowler_threatscore/prowler_threatscore_table_test.py b/tests/lib/outputs/compliance/prowler_threatscore/prowler_threatscore_table_test.py new file mode 100644 index 0000000000..d754395dc3 --- /dev/null +++ b/tests/lib/outputs/compliance/prowler_threatscore/prowler_threatscore_table_test.py @@ -0,0 +1,147 @@ +import re +from types import SimpleNamespace +from unittest import mock + +from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore import ( + get_prowler_threatscore_table, +) + +# Patch target for the Compliance.get_bulk lookup used to render pillars without +# findings; the tests don't exercise that path so it returns nothing. +COMPLIANCE_PATH = ( + "prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore.Compliance" +) + + +def _make_finding(check_id, status="PASS", muted=False): + return SimpleNamespace( + check_metadata=SimpleNamespace(CheckID=check_id), + status=status, + muted=muted, + ) + + +def _make_compliance(provider, pillars, framework="ProwlerThreatScore"): + """Build a per-check compliance covering the given pillars (Section).""" + return SimpleNamespace( + Framework=framework, + Provider=provider, + Requirements=[ + SimpleNamespace( + Attributes=[SimpleNamespace(Section=pillar, LevelOfRisk=5, Weight=100)] + ) + for pillar in pillars + ], + ) + + +class TestProwlerThreatScoreTable: + """Verify multi-section counting and provider-column attribution for the compliance table.""" + + def test_multi_pillar_fail_not_undercounted(self, capsys, tmp_path): + """A single FAIL check mapped to several pillars must show FAIL(1) in + every pillar, not just the first one seen.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Encryption"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + with mock.patch(COMPLIANCE_PATH) as compliance_mock: + compliance_mock.get_bulk.return_value = {} + get_prowler_threatscore_table( + findings, + bulk_metadata, + "prowler_threatscore_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + # Both IAM and Encryption must report FAIL(1); before the fix Encryption + # was undercounted because the per-pillar count was gated by the global + # dedup list. + assert captured.out.count("FAIL(1)") == 2 + + def test_multi_pillar_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED check mapped to several pillars must increase the + per-pillar Muted count in every pillar, not only the first one.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[_make_compliance("aws", ["IAM", "Encryption"])] + ), + "check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]), + } + findings = [ + _make_finding("check_a", "FAIL", muted=True), + # A real FAIL is needed so the results table is rendered at all. + _make_finding("check_b", "FAIL"), + ] + + with mock.patch(COMPLIANCE_PATH) as compliance_mock: + compliance_mock.get_bulk.return_value = {} + get_prowler_threatscore_table( + findings, + bulk_metadata, + "prowler_threatscore_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out) + # The muted check belongs to both IAM and Encryption, so the Muted + # column must read 1 in both rows. + muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_cells) == 2 + + def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path): + """The Provider column must come from the matched ProwlerThreatScore + compliance, never from a different framework that happens to be the last + entry in the check's compliance list.""" + bulk_metadata = { + "check_a": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + "check_b": SimpleNamespace( + Compliance=[ + _make_compliance("aws", ["IAM"]), + _make_compliance( + "leaked_provider", ["Other"], framework="OtherFramework" + ), + ] + ), + } + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + + with mock.patch(COMPLIANCE_PATH) as compliance_mock: + compliance_mock.get_bulk.return_value = {} + get_prowler_threatscore_table( + findings, + bulk_metadata, + "prowler_threatscore_aws", + "output", + str(tmp_path), + False, + ) + + captured = capsys.readouterr() + assert "aws" in captured.out + # The provider of the unrelated trailing framework must NOT leak into + # the rendered table. + assert "leaked_provider" not in captured.out diff --git a/tests/lib/outputs/compliance/universal/universal_table_test.py b/tests/lib/outputs/compliance/universal/universal_table_test.py index 7598c43c8b..abe3d0c3d0 100644 --- a/tests/lib/outputs/compliance/universal/universal_table_test.py +++ b/tests/lib/outputs/compliance/universal/universal_table_test.py @@ -1,3 +1,4 @@ +import re from types import SimpleNamespace from unittest.mock import MagicMock @@ -26,6 +27,10 @@ def _make_finding(check_id, status="PASS", muted=False): return finding +def _strip_ansi(text): + return re.sub(r"\x1b\[[0-9;]*m", "", text) + + def _make_framework(requirements, table_config, provider="AWS"): return ComplianceFramework( framework="TestFW", @@ -39,6 +44,8 @@ def _make_framework(requirements, table_config, provider="AWS"): class TestBuildRequirementCheckMap: + """Test cases for building the requirement-to-check map of a framework.""" + def test_basic(self): reqs = [ UniversalComplianceRequirement( @@ -103,6 +110,8 @@ class TestBuildRequirementCheckMap: class TestGetGroupKey: + """Test cases for resolving the group key of a requirement.""" + def test_normal_field(self): req = UniversalComplianceRequirement( id="1.1", @@ -124,7 +133,9 @@ class TestGetGroupKey: class TestGroupedMode: - def test_grouped_rendering(self, capsys): + """Test cases for grouped-mode universal compliance table rendering.""" + + def test_grouped_rendering(self, capsys, tmp_path): reqs = [ UniversalComplianceRequirement( id="1.1", @@ -156,7 +167,7 @@ class TestGroupedMode: bulk_metadata, "test_fw", "output", - "/tmp", + str(tmp_path), False, framework=fw, ) @@ -167,9 +178,118 @@ class TestGroupedMode: assert "PASS" in captured.out assert "FAIL" in captured.out + def test_grouped_multi_section_no_undercount(self, capsys, tmp_path): + """A single check mapped to several sections must be counted in + every section it belongs to, not only the first one seen.""" + reqs = [ + UniversalComplianceRequirement( + id="1.1", + description="test", + attributes={"Section": "IAM"}, + checks={"aws": ["check_a", "check_b"]}, + ), + UniversalComplianceRequirement( + id="2.1", + description="test2", + attributes={"Section": "Logging"}, + checks={"aws": ["check_a"]}, + ), + ] + tc = TableConfig(group_by="Section") + fw = _make_framework(reqs, tc) + + # check_a (FAIL) belongs to both IAM and Logging sections; check_b + # (PASS, IAM only) is added so the overview total reaches 2 and the + # results table is rendered. + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + bulk_metadata = { + "check_a": MagicMock(Compliance=[]), + "check_b": MagicMock(Compliance=[]), + } + + get_universal_table( + findings, + bulk_metadata, + "test_fw", + "output", + str(tmp_path), + False, + framework=fw, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # Both the IAM and Logging rows must report FAIL(1). Before the fix the + # second section seen (Logging) was undercounted to FAIL(0) and rendered + # as PASS. Anchor each occurrence to its own table row so an unrelated + # "FAIL(1)" elsewhere cannot mask an undercount. + iam_row = [ + line for line in plain.splitlines() if "IAM" in line and "FAIL(1)" in line + ] + logging_row = [ + line + for line in plain.splitlines() + if "Logging" in line and "FAIL(1)" in line + ] + assert len(iam_row) == 1 + assert len(logging_row) == 1 + + def test_grouped_multi_section_muted_not_undercounted(self, capsys, tmp_path): + """A single MUTED finding mapped to several groups must be counted in + the per-group Muted column of every group it belongs to.""" + reqs = [ + UniversalComplianceRequirement( + id="1.1", + description="test", + attributes={"Section": "IAM"}, + checks={"aws": ["check_a", "check_b"]}, + ), + UniversalComplianceRequirement( + id="2.1", + description="test2", + attributes={"Section": "Logging"}, + checks={"aws": ["check_a"]}, + ), + ] + tc = TableConfig(group_by="Section") + fw = _make_framework(reqs, tc) + + # check_a is MUTED and belongs to both IAM and Logging; check_b is a + # plain FAIL so the overview total reaches 2 and the table is rendered. + findings = [ + _make_finding("check_a", "FAIL", muted=True), + _make_finding("check_b", "FAIL"), + ] + bulk_metadata = { + "check_a": MagicMock(Compliance=[]), + "check_b": MagicMock(Compliance=[]), + } + + get_universal_table( + findings, + bulk_metadata, + "test_fw", + "output", + str(tmp_path), + False, + framework=fw, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # The muted finding belongs to both sections, so both the IAM row and + # the Logging row must carry a Muted count of 1 in their last cell. + muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_one_rows) == 2 + class TestSplitMode: - def test_split_rendering(self, capsys): + """Test cases for split-mode universal compliance table rendering.""" + + def test_split_rendering(self, capsys, tmp_path): reqs = [ UniversalComplianceRequirement( id="1.1", @@ -204,7 +324,7 @@ class TestSplitMode: bulk_metadata, "test_fw", "output", - "/tmp", + str(tmp_path), False, framework=fw, ) @@ -214,9 +334,119 @@ class TestSplitMode: assert "Level 1" in captured.out assert "Level 2" in captured.out + def test_split_muted_multi_section_not_undercounted(self, capsys, tmp_path): + """In split mode a single MUTED finding mapped to several groups must + be counted in the Muted column of every group it belongs to.""" + reqs = [ + UniversalComplianceRequirement( + id="1.1", + description="test", + attributes={"Section": "Storage", "Profile": "Level 1"}, + checks={"aws": ["check_a", "check_b"]}, + ), + UniversalComplianceRequirement( + id="2.1", + description="test2", + attributes={"Section": "Logging", "Profile": "Level 1"}, + checks={"aws": ["check_a"]}, + ), + ] + tc = TableConfig( + group_by="Section", + split_by=SplitByConfig(field="Profile", values=["Level 1", "Level 2"]), + ) + fw = _make_framework(reqs, tc) + + # check_a is MUTED and belongs to both Storage and Logging; check_b is a + # plain FAIL so the table is rendered. + findings = [ + _make_finding("check_a", "FAIL", muted=True), + _make_finding("check_b", "FAIL"), + ] + bulk_metadata = { + "check_a": MagicMock(Compliance=[]), + "check_b": MagicMock(Compliance=[]), + } + + get_universal_table( + findings, + bulk_metadata, + "test_fw", + "output", + str(tmp_path), + False, + framework=fw, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # Both section rows must carry a Muted count of 1 (last cell). Before the + # fix only the first group seen incremented Muted, leaving the other 0. + muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE) + assert len(muted_one_rows) == 2 + + def test_split_same_group_value_not_double_counted(self, capsys, tmp_path): + """A single finding whose check maps to several requirements that share + the same group and split value must count once for that group/split, + not once per requirement (FAIL(1), never FAIL(2)).""" + reqs = [ + # check_a appears in two requirements, both Storage / Level 1. + UniversalComplianceRequirement( + id="1.1", + description="test", + attributes={"Section": "Storage", "Profile": "Level 1"}, + checks={"aws": ["check_a"]}, + ), + UniversalComplianceRequirement( + id="1.2", + description="test2", + attributes={"Section": "Storage", "Profile": "Level 1"}, + checks={"aws": ["check_a"]}, + ), + # A second group so the table renders with more than one finding. + UniversalComplianceRequirement( + id="2.1", + description="test3", + attributes={"Section": "Logging", "Profile": "Level 1"}, + checks={"aws": ["check_b"]}, + ), + ] + tc = TableConfig( + group_by="Section", + split_by=SplitByConfig(field="Profile", values=["Level 1", "Level 2"]), + ) + fw = _make_framework(reqs, tc) + + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + bulk_metadata = { + "check_a": MagicMock(Compliance=[]), + "check_b": MagicMock(Compliance=[]), + } + + get_universal_table( + findings, + bulk_metadata, + "test_fw", + "output", + str(tmp_path), + False, + framework=fw, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + # The Storage row must show FAIL(1) for Level 1, never FAIL(2). + assert "FAIL(1)" in plain + assert "FAIL(2)" not in plain + class TestScoredMode: - def test_scored_rendering(self, capsys): + """Test cases for scored-mode universal compliance table rendering.""" + + def test_scored_rendering(self, capsys, tmp_path): reqs = [ UniversalComplianceRequirement( id="1.1", @@ -251,7 +481,7 @@ class TestScoredMode: bulk_metadata, "test_fw", "output", - "/tmp", + str(tmp_path), False, framework=fw, ) @@ -261,9 +491,68 @@ class TestScoredMode: assert "Score" in captured.out assert "Threat Score" in captured.out + def test_scored_multi_section_fail_not_undercounted(self, capsys, tmp_path): + """In scored mode a single FAIL finding mapped to several groups must + show FAIL(1) in every group it belongs to, not only the first one.""" + reqs = [ + UniversalComplianceRequirement( + id="1.1", + description="test", + attributes={"Section": "IAM", "LevelOfRisk": 5, "Weight": 100}, + checks={"aws": ["check_a", "check_b"]}, + ), + UniversalComplianceRequirement( + id="2.1", + description="test2", + attributes={"Section": "Logging", "LevelOfRisk": 3, "Weight": 50}, + checks={"aws": ["check_a"]}, + ), + ] + tc = TableConfig( + group_by="Section", + scoring=ScoringConfig(risk_field="LevelOfRisk", weight_field="Weight"), + ) + fw = _make_framework(reqs, tc) + + # check_a (FAIL) belongs to both IAM and Logging; check_b (PASS, IAM + # only) raises the overview total to 2 so the table is rendered. + findings = [ + _make_finding("check_a", "FAIL"), + _make_finding("check_b", "PASS"), + ] + bulk_metadata = { + "check_a": MagicMock(Compliance=[]), + "check_b": MagicMock(Compliance=[]), + } + + get_universal_table( + findings, + bulk_metadata, + "test_fw", + "output", + str(tmp_path), + False, + framework=fw, + ) + + captured = capsys.readouterr() + plain = _strip_ansi(captured.out) + iam_row = [ + line for line in plain.splitlines() if "IAM" in line and "FAIL(1)" in line + ] + logging_row = [ + line + for line in plain.splitlines() + if "Logging" in line and "FAIL(1)" in line + ] + assert len(iam_row) == 1 + assert len(logging_row) == 1 + class TestCustomLabels: - def test_ens_spanish_labels(self, capsys): + """Test cases for custom-label universal compliance table rendering.""" + + def test_ens_spanish_labels(self, capsys, tmp_path): reqs = [ UniversalComplianceRequirement( id="1.1", @@ -300,7 +589,7 @@ class TestCustomLabels: bulk_metadata, "test_fw", "output", - "/tmp", + str(tmp_path), False, framework=fw, ) @@ -311,7 +600,9 @@ class TestCustomLabels: class TestMultiProviderDictChecks: - def test_only_aws_checks_matched(self, capsys): + """Test cases for multi-provider dict checks in the universal table.""" + + def test_only_aws_checks_matched(self, capsys, tmp_path): """With dict checks and provider='aws', only AWS checks match findings.""" reqs = [ UniversalComplianceRequirement( @@ -352,7 +643,7 @@ class TestMultiProviderDictChecks: bulk_metadata, "multi_cloud", "output", - "/tmp", + str(tmp_path), False, framework=fw, provider="aws", @@ -366,7 +657,9 @@ class TestMultiProviderDictChecks: class TestNoTableConfig: - def test_returns_early_without_table_config(self, capsys): + """Test cases for the universal table when no table config is present.""" + + def test_returns_early_without_table_config(self, capsys, tmp_path): fw = ComplianceFramework( framework="TestFW", name="Test", @@ -374,11 +667,11 @@ class TestNoTableConfig: description="Test", requirements=[], ) - get_universal_table([], {}, "test", "out", "/tmp", False, framework=fw) + get_universal_table([], {}, "test", "out", str(tmp_path), False, framework=fw) captured = capsys.readouterr() assert captured.out == "" - def test_returns_early_without_framework(self, capsys): - get_universal_table([], {}, "test", "out", "/tmp", False, framework=None) + def test_returns_early_without_framework(self, capsys, tmp_path): + get_universal_table([], {}, "test", "out", str(tmp_path), False, framework=None) captured = capsys.readouterr() assert captured.out == ""