mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
fix(compliance): multi-section undercount & leaked provider tab (#11567)
This commit is contained in:
@@ -0,0 +1,132 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.asd_essential_eight.asd_essential_eight import (
|
||||
get_asd_essential_eight_table,
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(provider, sections, framework="ASD-Essential-Eight"):
|
||||
"""Build a per-check compliance covering the given sections."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Provider=provider,
|
||||
Requirements=[
|
||||
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
|
||||
for section in sections
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestASDEssentialEightTable:
|
||||
"""Test cases verifying multi-section counting and provider-column attribution for the ASD Essential Eight compliance table."""
|
||||
|
||||
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several sections must show FAIL(1) in
|
||||
every section, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_asd_essential_eight_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"asd_essential_eight_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both IAM and Logging must report FAIL(1); before the fix Logging was
|
||||
# undercounted because the per-section count was gated by the global
|
||||
# dedup list.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several sections must increase the
|
||||
per-section Muted count in every section, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A real FAIL is needed so the results table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_asd_essential_eight_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"asd_essential_eight_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# The muted check belongs to both IAM and Logging, so the Muted column
|
||||
# must read 1 in both rows.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched ASD-Essential-Eight
|
||||
compliance, never from a different framework that happens to be the last
|
||||
entry in the check's compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_asd_essential_eight_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"asd_essential_eight_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "leaked_provider" not in captured.out
|
||||
@@ -0,0 +1,130 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.c5.c5 import get_c5_table
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(provider, sections, framework="C5"):
|
||||
"""Build a per-check compliance covering the given sections."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Provider=provider,
|
||||
Requirements=[
|
||||
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
|
||||
for section in sections
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestC5Table:
|
||||
"""Verify multi-section counting and provider-column attribution for the compliance table."""
|
||||
|
||||
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several sections must show FAIL(1) in
|
||||
every section, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_c5_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"c5_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both IAM and Logging must report FAIL(1); before the fix Logging was
|
||||
# undercounted because the per-section count was gated by the global
|
||||
# dedup list.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several sections must increase the
|
||||
per-section Muted count in every section, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A real FAIL is needed so the results table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_c5_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"c5_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# The muted check belongs to both IAM and Logging, so the Muted column
|
||||
# must read 1 in both rows.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched C5 compliance, never
|
||||
from a different framework that happens to be the last entry in the
|
||||
check's compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_c5_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"c5_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "leaked_provider" not in captured.out
|
||||
@@ -0,0 +1,130 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.ccc.ccc import get_ccc_table
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(provider, sections, framework="CCC"):
|
||||
"""Build a per-check compliance covering the given sections."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Provider=provider,
|
||||
Requirements=[
|
||||
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
|
||||
for section in sections
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestCCCTable:
|
||||
"""Test cases verifying multi-section counting and provider-column attribution for the compliance table."""
|
||||
|
||||
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several sections must show FAIL(1) in
|
||||
every section, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_ccc_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ccc_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both IAM and Logging must report FAIL(1); before the fix Logging was
|
||||
# undercounted because the per-section count was gated by the global
|
||||
# dedup list.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several sections must increase the
|
||||
per-section Muted count in every section, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A real FAIL is needed so the results table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_ccc_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ccc_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# The muted check belongs to both IAM and Logging, so the Muted column
|
||||
# must read 1 in both rows.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched CCC compliance, never
|
||||
from a different framework that happens to be the last entry in the
|
||||
check's compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_ccc_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ccc_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "leaked_provider" not in captured.out
|
||||
@@ -0,0 +1,162 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
|
||||
|
||||
|
||||
def _strip_ansi(text):
|
||||
return re.sub(r"\x1b\[[0-9;]*m", "", text)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _attr(section, profile="Level 1"):
|
||||
return SimpleNamespace(Section=section, Profile=profile)
|
||||
|
||||
|
||||
def _make_compliance(provider, attributes, version="1.4", framework="CIS"):
|
||||
"""Build a per-check CIS compliance with the given (section, profile) attrs."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Version=version,
|
||||
Provider=provider,
|
||||
Requirements=[SimpleNamespace(Attributes=attributes)],
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance_multi_req(provider, attributes, version="1.4", framework="CIS"):
|
||||
"""Build a per-check CIS compliance where each attr is its own requirement,
|
||||
simulating a check that appears in several requirements."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Version=version,
|
||||
Provider=provider,
|
||||
Requirements=[SimpleNamespace(Attributes=[attr]) for attr in attributes],
|
||||
)
|
||||
|
||||
|
||||
class TestCISTable:
|
||||
"""Verify multi-section counting and provider-column attribution for the CIS compliance table."""
|
||||
|
||||
def test_muted_multi_section_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED finding mapped to several sections must increment the
|
||||
per-section Muted column for every section, not only the first seen.
|
||||
|
||||
CIS counts FAIL/PASS through Level 1/Level 2 buckets, so only the Muted
|
||||
per-section count was affected by the undercount bug.
|
||||
"""
|
||||
bulk_metadata = {
|
||||
# check_a is muted and belongs to two sections at once.
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("1 IAM"), _attr("2 Logging")])
|
||||
]
|
||||
),
|
||||
# A real (non-muted) finding so the table is rendered.
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", [_attr("1 IAM")])]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_cis_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"cis_1.4_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# Both section rows must carry a Muted count of 1 in their last cell.
|
||||
# Before the fix only the first section seen got incremented.
|
||||
muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_one_rows) == 2
|
||||
|
||||
def test_same_section_level_not_double_counted(self, capsys, tmp_path):
|
||||
"""A single finding whose check maps to several requirements that share
|
||||
the same section and profile must count once for that section/level,
|
||||
not once per requirement (FAIL(1), never FAIL(2))."""
|
||||
bulk_metadata = {
|
||||
# check_a is a single FAIL mapped to two requirements, both in the
|
||||
# same section "1 IAM" and the same profile "Level 1".
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance_multi_req("aws", [_attr("1 IAM"), _attr("1 IAM")])
|
||||
]
|
||||
),
|
||||
# A second finding in another section so the table renders.
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", [_attr("2 Logging")])]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_cis_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"cis_1.4_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# The "1 IAM" row must show FAIL(1) for Level 1, never FAIL(2).
|
||||
assert "FAIL(1)" in plain
|
||||
assert "FAIL(2)" not in plain
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched CIS compliance, not
|
||||
from a different framework that trails it in the compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("1 IAM")]),
|
||||
_make_compliance(
|
||||
"gcp", [_attr("Other")], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("1 IAM")]),
|
||||
_make_compliance(
|
||||
"gcp", [_attr("Other")], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_cis_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"cis_1.4_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The trailing unrelated framework's provider must not leak in.
|
||||
assert "gcp" not in captured.out
|
||||
@@ -0,0 +1,235 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
|
||||
|
||||
|
||||
def _strip_ansi(text):
|
||||
return re.sub(r"\x1b\[[0-9;]*m", "", text)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _attr(marco, categoria, tipo="requisito", nivel="alto"):
|
||||
return SimpleNamespace(Marco=marco, Categoria=categoria, Tipo=tipo, Nivel=nivel)
|
||||
|
||||
|
||||
def _make_compliance(provider, attributes, framework="ENS"):
|
||||
"""Build a per-check ENS compliance with the given marco/categoria attrs."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Provider=provider,
|
||||
Requirements=[SimpleNamespace(Attributes=attributes)],
|
||||
)
|
||||
|
||||
|
||||
class TestENSTable:
|
||||
"""Test cases for ENS compliance table rendering.
|
||||
|
||||
Verify multi-marco counting and provider-column attribution for the
|
||||
compliance table.
|
||||
"""
|
||||
|
||||
def test_no_cumple_marked_in_every_marco(self, capsys, tmp_path):
|
||||
"""A single failing finding mapped to several marcos must mark every
|
||||
one of them as NO CUMPLE, not only the first marco seen."""
|
||||
bulk_metadata = {
|
||||
# check_a fails and belongs to two distinct marcos/categorias.
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance(
|
||||
"aws",
|
||||
[
|
||||
_attr("operacional", "control de acceso"),
|
||||
_attr("organizativo", "politica de seguridad"),
|
||||
],
|
||||
)
|
||||
]
|
||||
),
|
||||
# A passing finding so the overview total reaches 2.
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("operacional", "control de acceso")])
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_ens_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ens_rd2022_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# Both marco rows the failing finding maps to must read NO CUMPLE.
|
||||
# Before the fix only the first marco was marked, the second stayed
|
||||
# CUMPLE. Anchor the assertion to the actual marco rows (not the
|
||||
# overview header line which also mentions NO CUMPLE).
|
||||
op_row = [
|
||||
line
|
||||
for line in plain.splitlines()
|
||||
if "operacional/control de acceso" in line
|
||||
]
|
||||
org_row = [
|
||||
line
|
||||
for line in plain.splitlines()
|
||||
if "organizativo/politica de seguridad" in line
|
||||
]
|
||||
assert len(op_row) == 1 and "NO CUMPLE" in op_row[0]
|
||||
assert len(org_row) == 1 and "NO CUMPLE" in org_row[0]
|
||||
|
||||
def test_recomendacion_does_not_set_no_cumple(self, capsys, tmp_path):
|
||||
"""A FAIL on a 'recomendacion' attribute must not flip a marco to
|
||||
NO CUMPLE (this path is intentionally excluded from the fix)."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance(
|
||||
"aws",
|
||||
[
|
||||
_attr(
|
||||
"operacional", "control de acceso", tipo="recomendacion"
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("organizativo", "politica")])
|
||||
]
|
||||
),
|
||||
# A regular (non-recomendacion) check so the results table renders
|
||||
# at least one marco row and the assertion below is not vacuous.
|
||||
"check_c": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("operacional", "continuidad")])
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
_make_finding("check_c", "PASS"),
|
||||
]
|
||||
|
||||
get_ens_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ens_rd2022_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# The recomendacion FAIL must not appear as a NO CUMPLE marco row in the
|
||||
# results table (the overview header line is allowed to mention it).
|
||||
marco_rows = [
|
||||
line
|
||||
for line in plain.splitlines()
|
||||
if "operacional" in line or "organizativo" in line
|
||||
]
|
||||
# Guard against a vacuous pass: the table must actually render rows.
|
||||
assert marco_rows
|
||||
assert all("NO CUMPLE" not in line for line in marco_rows)
|
||||
|
||||
def test_muted_multi_marco_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED finding mapped to several marcos must increment the
|
||||
per-marco Muted column for every marco, not only the first seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance(
|
||||
"aws",
|
||||
[
|
||||
_attr("operacional", "control de acceso"),
|
||||
_attr("organizativo", "politica de seguridad"),
|
||||
],
|
||||
)
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", [_attr("operacional", "control de acceso")])
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_ens_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ens_rd2022_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# Both marco rows the muted finding maps to must report a Muted count of
|
||||
# 1 in their last cell.
|
||||
muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_one_rows) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Proveedor column must come from the matched ENS compliance, not
|
||||
from a different framework that trails it in the compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance(
|
||||
"aws", [_attr("operacional", "control de acceso")]
|
||||
),
|
||||
_make_compliance(
|
||||
"gcp", [_attr("x", "y")], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance(
|
||||
"aws", [_attr("operacional", "control de acceso")]
|
||||
),
|
||||
_make_compliance(
|
||||
"gcp", [_attr("x", "y")], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_ens_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"ens_rd2022_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
assert "gcp" not in captured.out
|
||||
@@ -0,0 +1,137 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.kisa_ismsp.kisa_ismsp import get_kisa_ismsp_table
|
||||
|
||||
# The generator matches a compliance when its Framework starts with "KISA" and
|
||||
# its Version is contained in the compliance_framework argument.
|
||||
COMPLIANCE_FRAMEWORK = "kisa-isms-p-2023_aws"
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(
|
||||
provider, sections, framework="KISA-ISMS-P", version="kisa-isms-p-2023"
|
||||
):
|
||||
"""Build a per-check compliance covering the given sections."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Version=version,
|
||||
Provider=provider,
|
||||
Requirements=[
|
||||
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
|
||||
for section in sections
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestKISAISMSPTable:
|
||||
"""Verify multi-section counting and provider-column attribution for the KISA ISMS-P compliance table."""
|
||||
|
||||
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several sections must show FAIL(1) in
|
||||
every section, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_kisa_ismsp_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
COMPLIANCE_FRAMEWORK,
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both IAM and Logging must report FAIL(1); before the fix Logging was
|
||||
# undercounted because the per-section count was gated by the global
|
||||
# dedup list.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several sections must increase the
|
||||
per-section Muted count in every section, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A real FAIL is needed so the results table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_kisa_ismsp_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
COMPLIANCE_FRAMEWORK,
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# The muted check belongs to both IAM and Logging, so the Muted column
|
||||
# must read 1 in both rows.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched KISA compliance, never
|
||||
from a different framework that happens to be the last entry in the
|
||||
check's compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_kisa_ismsp_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
COMPLIANCE_FRAMEWORK,
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "leaked_provider" not in captured.out
|
||||
@@ -0,0 +1,140 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.mitre_attack.mitre_attack import (
|
||||
get_mitre_attack_table,
|
||||
)
|
||||
|
||||
# The generator matches a compliance when "MITRE-ATTACK" is in its Framework and
|
||||
# its Version is contained in the compliance_framework argument.
|
||||
COMPLIANCE_FRAMEWORK = "mitre_attack_aws"
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(
|
||||
provider, tactics, framework="MITRE-ATTACK", version="mitre_attack"
|
||||
):
|
||||
"""Build a per-check compliance covering the given tactics."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Version=version,
|
||||
Provider=provider,
|
||||
Requirements=[SimpleNamespace(Tactics=tactics)],
|
||||
)
|
||||
|
||||
|
||||
class TestMitreAttackTable:
|
||||
"""Test multi-section counting and provider-column attribution for the compliance table."""
|
||||
|
||||
def test_multi_tactic_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several tactics must show FAIL(1) in
|
||||
every tactic, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["Persistence", "Execution"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["Persistence"])]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_mitre_attack_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
COMPLIANCE_FRAMEWORK,
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both Persistence and Execution must report FAIL(1); before the fix
|
||||
# Execution was undercounted because the per-tactic count was gated by
|
||||
# the global dedup list.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_tactic_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several tactics must increase the
|
||||
per-tactic Muted count in every tactic, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["Persistence", "Execution"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["Persistence"])]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A second finding is needed so the table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_mitre_attack_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
COMPLIANCE_FRAMEWORK,
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# The muted check belongs to both Persistence and Execution, so the
|
||||
# Muted column must read 1 in both rows.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched MITRE-ATTACK
|
||||
compliance, never from a different framework that happens to be the last
|
||||
entry in the check's compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["Persistence"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["Persistence"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_mitre_attack_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
COMPLIANCE_FRAMEWORK,
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "leaked_provider" not in captured.out
|
||||
@@ -0,0 +1,136 @@
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.outputs.compliance.okta_idaas_stig.okta_idaas_stig import (
|
||||
get_okta_idaas_stig_table,
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(provider, sections, framework="Okta-IDaaS-STIG"):
|
||||
"""Build a per-check compliance covering the given sections."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Provider=provider,
|
||||
Requirements=[
|
||||
SimpleNamespace(Attributes=[SimpleNamespace(Section=section)])
|
||||
for section in sections
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestOktaIDaaSSTIGTable:
|
||||
"""Test cases for Okta IDaaS STIG compliance table rendering."""
|
||||
|
||||
def test_multi_section_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several sections must show FAIL(1) in
|
||||
every section, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
# check_a belongs to two sections at once.
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("okta", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("okta", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_okta_idaas_stig_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"okta_idaas_stig_1r2",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both IAM and Logging must report FAIL(1); before the fix Logging
|
||||
# was undercounted and rendered as plain PASS.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_section_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several sections must increase the
|
||||
per-section Muted count in every section, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("okta", ["IAM", "Logging"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("okta", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A real FAIL is needed so the results table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
get_okta_idaas_stig_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"okta_idaas_stig_1r2",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# The muted check belongs to both IAM and Logging, so the Muted column
|
||||
# must read 1 in both rows. Before the fix only the first section seen
|
||||
# was incremented, leaving the second at 0.
|
||||
# Strip ANSI color codes before counting the bare values per row.
|
||||
import re
|
||||
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# Each section row ends with its Muted value in its own cell; both rows
|
||||
# must carry a Muted count of 1.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched Okta-IDaaS-STIG
|
||||
compliance, never from a different framework that happens to be the
|
||||
last entry in the check's compliance list."""
|
||||
# check_a maps to Okta-IDaaS-STIG (provider "okta") but its compliance
|
||||
# list ends with a *different* framework whose provider is "aws". With
|
||||
# the bug the leaked loop variable made the table render "aws".
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("okta", ["IAM"]),
|
||||
_make_compliance("aws", ["Other"], framework="OtherFramework"),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("okta", ["IAM"]),
|
||||
_make_compliance("aws", ["Other"], framework="OtherFramework"),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
get_okta_idaas_stig_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"okta_idaas_stig_1r2",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "okta" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "aws" not in captured.out
|
||||
@@ -0,0 +1,147 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore import (
|
||||
get_prowler_threatscore_table,
|
||||
)
|
||||
|
||||
# Patch target for the Compliance.get_bulk lookup used to render pillars without
|
||||
# findings; the tests don't exercise that path so it returns nothing.
|
||||
COMPLIANCE_PATH = (
|
||||
"prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore.Compliance"
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
return SimpleNamespace(
|
||||
check_metadata=SimpleNamespace(CheckID=check_id),
|
||||
status=status,
|
||||
muted=muted,
|
||||
)
|
||||
|
||||
|
||||
def _make_compliance(provider, pillars, framework="ProwlerThreatScore"):
|
||||
"""Build a per-check compliance covering the given pillars (Section)."""
|
||||
return SimpleNamespace(
|
||||
Framework=framework,
|
||||
Provider=provider,
|
||||
Requirements=[
|
||||
SimpleNamespace(
|
||||
Attributes=[SimpleNamespace(Section=pillar, LevelOfRisk=5, Weight=100)]
|
||||
)
|
||||
for pillar in pillars
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestProwlerThreatScoreTable:
|
||||
"""Verify multi-section counting and provider-column attribution for the compliance table."""
|
||||
|
||||
def test_multi_pillar_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single FAIL check mapped to several pillars must show FAIL(1) in
|
||||
every pillar, not just the first one seen."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Encryption"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
with mock.patch(COMPLIANCE_PATH) as compliance_mock:
|
||||
compliance_mock.get_bulk.return_value = {}
|
||||
get_prowler_threatscore_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"prowler_threatscore_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
# Both IAM and Encryption must report FAIL(1); before the fix Encryption
|
||||
# was undercounted because the per-pillar count was gated by the global
|
||||
# dedup list.
|
||||
assert captured.out.count("FAIL(1)") == 2
|
||||
|
||||
def test_multi_pillar_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED check mapped to several pillars must increase the
|
||||
per-pillar Muted count in every pillar, not only the first one."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[_make_compliance("aws", ["IAM", "Encryption"])]
|
||||
),
|
||||
"check_b": SimpleNamespace(Compliance=[_make_compliance("aws", ["IAM"])]),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
# A real FAIL is needed so the results table is rendered at all.
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
|
||||
with mock.patch(COMPLIANCE_PATH) as compliance_mock:
|
||||
compliance_mock.get_bulk.return_value = {}
|
||||
get_prowler_threatscore_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"prowler_threatscore_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = re.sub(r"\x1b\[[0-9;]*m", "", captured.out)
|
||||
# The muted check belongs to both IAM and Encryption, so the Muted
|
||||
# column must read 1 in both rows.
|
||||
muted_cells = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_cells) == 2
|
||||
|
||||
def test_provider_column_not_leaked_from_other_framework(self, capsys, tmp_path):
|
||||
"""The Provider column must come from the matched ProwlerThreatScore
|
||||
compliance, never from a different framework that happens to be the last
|
||||
entry in the check's compliance list."""
|
||||
bulk_metadata = {
|
||||
"check_a": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
"check_b": SimpleNamespace(
|
||||
Compliance=[
|
||||
_make_compliance("aws", ["IAM"]),
|
||||
_make_compliance(
|
||||
"leaked_provider", ["Other"], framework="OtherFramework"
|
||||
),
|
||||
]
|
||||
),
|
||||
}
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
|
||||
with mock.patch(COMPLIANCE_PATH) as compliance_mock:
|
||||
compliance_mock.get_bulk.return_value = {}
|
||||
get_prowler_threatscore_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"prowler_threatscore_aws",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "aws" in captured.out
|
||||
# The provider of the unrelated trailing framework must NOT leak into
|
||||
# the rendered table.
|
||||
assert "leaked_provider" not in captured.out
|
||||
@@ -1,3 +1,4 @@
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
@@ -26,6 +27,10 @@ def _make_finding(check_id, status="PASS", muted=False):
|
||||
return finding
|
||||
|
||||
|
||||
def _strip_ansi(text):
|
||||
return re.sub(r"\x1b\[[0-9;]*m", "", text)
|
||||
|
||||
|
||||
def _make_framework(requirements, table_config, provider="AWS"):
|
||||
return ComplianceFramework(
|
||||
framework="TestFW",
|
||||
@@ -39,6 +44,8 @@ def _make_framework(requirements, table_config, provider="AWS"):
|
||||
|
||||
|
||||
class TestBuildRequirementCheckMap:
|
||||
"""Test cases for building the requirement-to-check map of a framework."""
|
||||
|
||||
def test_basic(self):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
@@ -103,6 +110,8 @@ class TestBuildRequirementCheckMap:
|
||||
|
||||
|
||||
class TestGetGroupKey:
|
||||
"""Test cases for resolving the group key of a requirement."""
|
||||
|
||||
def test_normal_field(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
@@ -124,7 +133,9 @@ class TestGetGroupKey:
|
||||
|
||||
|
||||
class TestGroupedMode:
|
||||
def test_grouped_rendering(self, capsys):
|
||||
"""Test cases for grouped-mode universal compliance table rendering."""
|
||||
|
||||
def test_grouped_rendering(self, capsys, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
@@ -156,7 +167,7 @@ class TestGroupedMode:
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
@@ -167,9 +178,118 @@ class TestGroupedMode:
|
||||
assert "PASS" in captured.out
|
||||
assert "FAIL" in captured.out
|
||||
|
||||
def test_grouped_multi_section_no_undercount(self, capsys, tmp_path):
|
||||
"""A single check mapped to several sections must be counted in
|
||||
every section it belongs to, not only the first one seen."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
description="test",
|
||||
attributes={"Section": "IAM"},
|
||||
checks={"aws": ["check_a", "check_b"]},
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
id="2.1",
|
||||
description="test2",
|
||||
attributes={"Section": "Logging"},
|
||||
checks={"aws": ["check_a"]},
|
||||
),
|
||||
]
|
||||
tc = TableConfig(group_by="Section")
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
# check_a (FAIL) belongs to both IAM and Logging sections; check_b
|
||||
# (PASS, IAM only) is added so the overview total reaches 2 and the
|
||||
# results table is rendered.
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# Both the IAM and Logging rows must report FAIL(1). Before the fix the
|
||||
# second section seen (Logging) was undercounted to FAIL(0) and rendered
|
||||
# as PASS. Anchor each occurrence to its own table row so an unrelated
|
||||
# "FAIL(1)" elsewhere cannot mask an undercount.
|
||||
iam_row = [
|
||||
line for line in plain.splitlines() if "IAM" in line and "FAIL(1)" in line
|
||||
]
|
||||
logging_row = [
|
||||
line
|
||||
for line in plain.splitlines()
|
||||
if "Logging" in line and "FAIL(1)" in line
|
||||
]
|
||||
assert len(iam_row) == 1
|
||||
assert len(logging_row) == 1
|
||||
|
||||
def test_grouped_multi_section_muted_not_undercounted(self, capsys, tmp_path):
|
||||
"""A single MUTED finding mapped to several groups must be counted in
|
||||
the per-group Muted column of every group it belongs to."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
description="test",
|
||||
attributes={"Section": "IAM"},
|
||||
checks={"aws": ["check_a", "check_b"]},
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
id="2.1",
|
||||
description="test2",
|
||||
attributes={"Section": "Logging"},
|
||||
checks={"aws": ["check_a"]},
|
||||
),
|
||||
]
|
||||
tc = TableConfig(group_by="Section")
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
# check_a is MUTED and belongs to both IAM and Logging; check_b is a
|
||||
# plain FAIL so the overview total reaches 2 and the table is rendered.
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# The muted finding belongs to both sections, so both the IAM row and
|
||||
# the Logging row must carry a Muted count of 1 in their last cell.
|
||||
muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_one_rows) == 2
|
||||
|
||||
|
||||
class TestSplitMode:
|
||||
def test_split_rendering(self, capsys):
|
||||
"""Test cases for split-mode universal compliance table rendering."""
|
||||
|
||||
def test_split_rendering(self, capsys, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
@@ -204,7 +324,7 @@ class TestSplitMode:
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
@@ -214,9 +334,119 @@ class TestSplitMode:
|
||||
assert "Level 1" in captured.out
|
||||
assert "Level 2" in captured.out
|
||||
|
||||
def test_split_muted_multi_section_not_undercounted(self, capsys, tmp_path):
|
||||
"""In split mode a single MUTED finding mapped to several groups must
|
||||
be counted in the Muted column of every group it belongs to."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
description="test",
|
||||
attributes={"Section": "Storage", "Profile": "Level 1"},
|
||||
checks={"aws": ["check_a", "check_b"]},
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
id="2.1",
|
||||
description="test2",
|
||||
attributes={"Section": "Logging", "Profile": "Level 1"},
|
||||
checks={"aws": ["check_a"]},
|
||||
),
|
||||
]
|
||||
tc = TableConfig(
|
||||
group_by="Section",
|
||||
split_by=SplitByConfig(field="Profile", values=["Level 1", "Level 2"]),
|
||||
)
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
# check_a is MUTED and belongs to both Storage and Logging; check_b is a
|
||||
# plain FAIL so the table is rendered.
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL", muted=True),
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# Both section rows must carry a Muted count of 1 (last cell). Before the
|
||||
# fix only the first group seen incremented Muted, leaving the other 0.
|
||||
muted_one_rows = re.findall(r"│\s*1\s*│\s*$", plain, flags=re.MULTILINE)
|
||||
assert len(muted_one_rows) == 2
|
||||
|
||||
def test_split_same_group_value_not_double_counted(self, capsys, tmp_path):
|
||||
"""A single finding whose check maps to several requirements that share
|
||||
the same group and split value must count once for that group/split,
|
||||
not once per requirement (FAIL(1), never FAIL(2))."""
|
||||
reqs = [
|
||||
# check_a appears in two requirements, both Storage / Level 1.
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
description="test",
|
||||
attributes={"Section": "Storage", "Profile": "Level 1"},
|
||||
checks={"aws": ["check_a"]},
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
id="1.2",
|
||||
description="test2",
|
||||
attributes={"Section": "Storage", "Profile": "Level 1"},
|
||||
checks={"aws": ["check_a"]},
|
||||
),
|
||||
# A second group so the table renders with more than one finding.
|
||||
UniversalComplianceRequirement(
|
||||
id="2.1",
|
||||
description="test3",
|
||||
attributes={"Section": "Logging", "Profile": "Level 1"},
|
||||
checks={"aws": ["check_b"]},
|
||||
),
|
||||
]
|
||||
tc = TableConfig(
|
||||
group_by="Section",
|
||||
split_by=SplitByConfig(field="Profile", values=["Level 1", "Level 2"]),
|
||||
)
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
# The Storage row must show FAIL(1) for Level 1, never FAIL(2).
|
||||
assert "FAIL(1)" in plain
|
||||
assert "FAIL(2)" not in plain
|
||||
|
||||
|
||||
class TestScoredMode:
|
||||
def test_scored_rendering(self, capsys):
|
||||
"""Test cases for scored-mode universal compliance table rendering."""
|
||||
|
||||
def test_scored_rendering(self, capsys, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
@@ -251,7 +481,7 @@ class TestScoredMode:
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
@@ -261,9 +491,68 @@ class TestScoredMode:
|
||||
assert "Score" in captured.out
|
||||
assert "Threat Score" in captured.out
|
||||
|
||||
def test_scored_multi_section_fail_not_undercounted(self, capsys, tmp_path):
|
||||
"""In scored mode a single FAIL finding mapped to several groups must
|
||||
show FAIL(1) in every group it belongs to, not only the first one."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
description="test",
|
||||
attributes={"Section": "IAM", "LevelOfRisk": 5, "Weight": 100},
|
||||
checks={"aws": ["check_a", "check_b"]},
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
id="2.1",
|
||||
description="test2",
|
||||
attributes={"Section": "Logging", "LevelOfRisk": 3, "Weight": 50},
|
||||
checks={"aws": ["check_a"]},
|
||||
),
|
||||
]
|
||||
tc = TableConfig(
|
||||
group_by="Section",
|
||||
scoring=ScoringConfig(risk_field="LevelOfRisk", weight_field="Weight"),
|
||||
)
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
# check_a (FAIL) belongs to both IAM and Logging; check_b (PASS, IAM
|
||||
# only) raises the overview total to 2 so the table is rendered.
|
||||
findings = [
|
||||
_make_finding("check_a", "FAIL"),
|
||||
_make_finding("check_b", "PASS"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
plain = _strip_ansi(captured.out)
|
||||
iam_row = [
|
||||
line for line in plain.splitlines() if "IAM" in line and "FAIL(1)" in line
|
||||
]
|
||||
logging_row = [
|
||||
line
|
||||
for line in plain.splitlines()
|
||||
if "Logging" in line and "FAIL(1)" in line
|
||||
]
|
||||
assert len(iam_row) == 1
|
||||
assert len(logging_row) == 1
|
||||
|
||||
|
||||
class TestCustomLabels:
|
||||
def test_ens_spanish_labels(self, capsys):
|
||||
"""Test cases for custom-label universal compliance table rendering."""
|
||||
|
||||
def test_ens_spanish_labels(self, capsys, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
id="1.1",
|
||||
@@ -300,7 +589,7 @@ class TestCustomLabels:
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
@@ -311,7 +600,9 @@ class TestCustomLabels:
|
||||
|
||||
|
||||
class TestMultiProviderDictChecks:
|
||||
def test_only_aws_checks_matched(self, capsys):
|
||||
"""Test cases for multi-provider dict checks in the universal table."""
|
||||
|
||||
def test_only_aws_checks_matched(self, capsys, tmp_path):
|
||||
"""With dict checks and provider='aws', only AWS checks match findings."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
@@ -352,7 +643,7 @@ class TestMultiProviderDictChecks:
|
||||
bulk_metadata,
|
||||
"multi_cloud",
|
||||
"output",
|
||||
"/tmp",
|
||||
str(tmp_path),
|
||||
False,
|
||||
framework=fw,
|
||||
provider="aws",
|
||||
@@ -366,7 +657,9 @@ class TestMultiProviderDictChecks:
|
||||
|
||||
|
||||
class TestNoTableConfig:
|
||||
def test_returns_early_without_table_config(self, capsys):
|
||||
"""Test cases for the universal table when no table config is present."""
|
||||
|
||||
def test_returns_early_without_table_config(self, capsys, tmp_path):
|
||||
fw = ComplianceFramework(
|
||||
framework="TestFW",
|
||||
name="Test",
|
||||
@@ -374,11 +667,11 @@ class TestNoTableConfig:
|
||||
description="Test",
|
||||
requirements=[],
|
||||
)
|
||||
get_universal_table([], {}, "test", "out", "/tmp", False, framework=fw)
|
||||
get_universal_table([], {}, "test", "out", str(tmp_path), False, framework=fw)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
|
||||
def test_returns_early_without_framework(self, capsys):
|
||||
get_universal_table([], {}, "test", "out", "/tmp", False, framework=None)
|
||||
def test_returns_early_without_framework(self, capsys, tmp_path):
|
||||
get_universal_table([], {}, "test", "out", str(tmp_path), False, framework=None)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
|
||||
Reference in New Issue
Block a user