feat(dashboard): render dynamic-provider compliance frameworks (#11503)

Co-authored-by: pedrooot <pedromarting3@gmail.com>
This commit is contained in:
StylusFrost
2026-06-10 11:16:39 +02:00
committed by GitHub
parent 4a5a49b5bb
commit 01b49f0743
13 changed files with 814 additions and 33 deletions
View File
+81
View File
@@ -0,0 +1,81 @@
import pandas as pd
from dash import dash_table
from dashboard.common_methods import get_section_containers_generic
def _datatable_column_ids(component):
"""Collect the column ids of every DataTable in a Dash component tree."""
if isinstance(component, dash_table.DataTable):
return [[c["id"] for c in component.columns]]
children = getattr(component, "children", None)
if children is None:
return []
if not isinstance(children, (list, tuple)):
children = [children]
return [cols for child in children for cols in _datatable_column_ids(child)]
def _df(**extra):
data = {
"REQUIREMENTS_ID": ["req1"],
"STATUS": ["PASS"],
"CHECKID": ["check1"],
"REGION": ["us-east-1"],
"ACCOUNTID": ["123"],
"RESOURCEID": ["res1"],
}
data.update(extra)
return pd.DataFrame(data)
class TestGetSectionContainersGeneric:
def test_one_container_per_section(self):
"""One outer container per distinct section value."""
df = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A", "Sec A", "Sec B"],
"REQUIREMENTS_ID": ["req1", "req2", "req3"],
"STATUS": ["PASS", "FAIL", "PASS"],
"CHECKID": ["c1", "c2", "c3"],
"REGION": ["-"] * 3,
"ACCOUNTID": ["123"] * 3,
"RESOURCEID": ["r1", "r2", "r3"],
}
)
result = get_section_containers_generic(
df, "REQUIREMENTS_ATTRIBUTES_SECTION", "REQUIREMENTS_ID"
)
assert len(result.children) == 2
def test_inner_title_includes_id_and_description(self):
"""Inner accordion title is '<id> - <description>'."""
df = _df(
REQUIREMENTS_ATTRIBUTES_SECTION=["Sec A"],
REQUIREMENTS_DESCRIPTION=["Ensure MFA"],
)
rendered = str(
get_section_containers_generic(
df, "REQUIREMENTS_ATTRIBUTES_SECTION", "REQUIREMENTS_ID"
)
)
assert "req1 - Ensure MFA" in rendered
def test_arbitrary_ids_do_not_crash(self):
"""Non-numeric ids are sorted lexicographically without raising."""
df = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A"] * 3,
"REQUIREMENTS_ID": ["AC-2(1)", "foo-bar", "step.1.2"],
"STATUS": ["PASS", "FAIL", "PASS"],
"CHECKID": ["c1", "c2", "c3"],
"REGION": ["-"] * 3,
"ACCOUNTID": ["123"] * 3,
"RESOURCEID": ["r1", "r2", "r3"],
}
)
result = get_section_containers_generic(
df, "REQUIREMENTS_ATTRIBUTES_SECTION", "REQUIREMENTS_ID"
)
tables = _datatable_column_ids(result)
assert tables and all("CHECKID" in cols for cols in tables)
+204
View File
@@ -0,0 +1,204 @@
import pandas as pd
from dash import dash_table, html
from dashboard.compliance.generic import get_table
def _make_minimal_df(**extra_cols):
"""Create a minimal valid DataFrame for get_table tests."""
data = {
"REQUIREMENTS_ID": ["req1"],
"STATUS": ["PASS"],
"CHECKID": ["check1"],
"REGION": ["us-east-1"],
"ACCOUNTID": ["123456789"],
"RESOURCEID": ["res1"],
}
data.update(extra_cols)
return pd.DataFrame(data)
def _datatable_column_ids(component):
"""Collect the column ids of every DataTable in a Dash component tree."""
if isinstance(component, dash_table.DataTable):
return [[c["id"] for c in component.columns]]
children = getattr(component, "children", None)
if children is None:
return []
if not isinstance(children, (list, tuple)):
children = [children]
return [cols for child in children for cols in _datatable_column_ids(child)]
class TestGetTable:
def test_groups_by_section(self):
"""SC-001a: df with REQUIREMENTS_ATTRIBUTES_SECTION returns Div grouped by section."""
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": [
"Section A",
"Section A",
"Section A",
"Section B",
"Section B",
],
"REQUIREMENTS_ID": [
"ctrl-alpha",
"ctrl-alpha",
"ctrl-alpha",
"ctrl-beta",
"ctrl-beta",
],
"STATUS": ["PASS", "FAIL", "PASS", "FAIL", "FAIL"],
"CHECKID": ["check1", "check2", "check3", "check4", "check5"],
"REGION": ["us-east-1"] * 5,
"ACCOUNTID": ["123"] * 5,
"RESOURCEID": ["res1", "res2", "res3", "res4", "res5"],
}
)
result = get_table(data)
assert isinstance(result, html.Div)
assert result.className == "compliance-data-layout"
assert len(result.children) == 2 # one container per distinct section
def test_flat_fallback_no_attributes(self):
"""SC-001b: No REQUIREMENTS_ATTRIBUTES_* cols → grouped by REQUIREMENTS_ID."""
data = pd.DataFrame(
{
"REQUIREMENTS_ID": ["req1", "req1", "req2"],
"STATUS": ["PASS", "FAIL", "FAIL"],
"CHECKID": ["check1", "check2", "check3"],
"REGION": ["us-east-1"] * 3,
"ACCOUNTID": ["123"] * 3,
"RESOURCEID": ["res1", "res2", "res3"],
}
)
result = get_table(data)
assert isinstance(result, html.Div)
assert result.className == "compliance-data-layout"
# 2 distinct REQUIREMENTS_ID values → 2 group containers
assert len(result.children) == 2
def test_arbitrary_ids_no_crash(self):
"""ADR-2 / R1 regression guard: non-numeric REQUIREMENTS_IDs must not raise ValueError.
get_section_containers_cis sorts by version_tuple which calls int() on each
dotted/dashed segment and crashes on IDs like 'AC-2(1)'. Selecting format4
(no version sort) is the fix. This test is a permanent guard against regression.
"""
data = pd.DataFrame(
{
"REQUIREMENTS_ID": ["AC-2(1)", "foo-bar", "step.1.2"],
"STATUS": ["PASS", "FAIL", "PASS"],
"CHECKID": ["check1", "check2", "check3"],
"REGION": ["us-east-1"] * 3,
"ACCOUNTID": ["123"] * 3,
"RESOURCEID": ["res1", "res2", "res3"],
}
)
# Must not raise ValueError
result = get_table(data)
assert isinstance(result, html.Div)
def test_discovers_multiple_attribute_columns(self):
"""SC-005a: Multiple REQUIREMENTS_ATTRIBUTES_* cols present → no AttributeError;
component tree is non-empty."""
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A", "Sec B"],
"REQUIREMENTS_ATTRIBUTES_CATEGORY": ["Cat 1", "Cat 2"],
"REQUIREMENTS_ATTRIBUTES_CONTROL_ID": ["C1", "C2"],
"REQUIREMENTS_ID": ["req1", "req2"],
"STATUS": ["PASS", "FAIL"],
"CHECKID": ["check1", "check2"],
"REGION": ["us-east-1"] * 2,
"ACCOUNTID": ["123"] * 2,
"RESOURCEID": ["res1", "res2"],
}
)
result = get_table(data)
assert isinstance(result, html.Div)
assert result.children # non-empty component tree
def test_novel_attribute_column_names(self):
"""SC-005b: Novel attr col names without a SECTION col → first attr col used as
grouping; returns a valid html.Div without any code change required."""
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_DOMAIN": ["Domain A", "Domain B"],
"REQUIREMENTS_ATTRIBUTES_SUBDOMAIN": ["Sub 1", "Sub 2"],
"REQUIREMENTS_ID": ["req1", "req2"],
"STATUS": ["PASS", "FAIL"],
"CHECKID": ["check1", "check2"],
"REGION": ["us-east-1"] * 2,
"ACCOUNTID": ["123"] * 2,
"RESOURCEID": ["res1", "res2"],
}
)
result = get_table(data)
assert isinstance(result, html.Div)
assert len(result.children) > 0
def test_manual_only_requirements(self):
"""SC-008a: All rows have STATUS='MANUAL' → returns html.Div with non-empty
children; result is not the 'No data found' string."""
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A", "Sec B"],
"REQUIREMENTS_ID": ["req1", "req2"],
"STATUS": ["MANUAL", "MANUAL"],
"CHECKID": ["check1", "check2"],
"REGION": ["us-east-1"] * 2,
"ACCOUNTID": ["123"] * 2,
"RESOURCEID": ["res1", "res2"],
}
)
result = get_table(data)
assert isinstance(result, html.Div)
assert not isinstance(result, str)
assert result.children # non-empty
def test_empty_dataframe(self):
"""SC-009a: Zero rows with correct column schema → valid html.Div; no exception."""
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": pd.Series([], dtype=str),
"REQUIREMENTS_ID": pd.Series([], dtype=str),
"STATUS": pd.Series([], dtype=str),
"CHECKID": pd.Series([], dtype=str),
"REGION": pd.Series([], dtype=str),
"ACCOUNTID": pd.Series([], dtype=str),
"RESOURCEID": pd.Series([], dtype=str),
}
)
result = get_table(data)
assert isinstance(result, html.Div)
def test_get_table_returns_html_div(self):
"""SC-012a: Smoke test — isinstance(get_table(df), html.Div) is True."""
data = _make_minimal_df(
REQUIREMENTS_ATTRIBUTES_SECTION=["Sec A"],
)
result = get_table(data)
assert isinstance(result, html.Div)
class TestNestedRendering:
def test_section_and_requirement_id_are_separate_levels(self):
"""Section is the outer level; requirement id + description the inner."""
data = _make_minimal_df(
REQUIREMENTS_ATTRIBUTES_SECTION=["3 Compute Services"],
REQUIREMENTS_DESCRIPTION=["Ensure only MFA enabled identities"],
)
rendered = str(get_table(data))
assert "3 Compute Services" in rendered
assert "req1 - Ensure only MFA enabled identities" in rendered
def test_checks_table_is_nested_under_requirement(self):
"""The checks table sits at the innermost level."""
data = _make_minimal_df(
REQUIREMENTS_ATTRIBUTES_SECTION=["Sec A"],
REQUIREMENTS_DESCRIPTION=["Some requirement"],
)
tables = _datatable_column_ids(get_table(data))
assert tables and all("CHECKID" in cols for cols in tables)
View File
@@ -0,0 +1,179 @@
from unittest.mock import MagicMock, patch
import pandas as pd
import pytest
from dash import html
from dashboard.pages.compliance import _dispatch_compliance_renderer
def _make_dispatch_df(**extra_cols):
"""Minimal DataFrame with the columns required by the dedup step."""
data = {
"REQUIREMENTS_ID": ["req1", "req2"],
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A", "Sec A"],
"STATUS": ["PASS", "FAIL"],
"CHECKID": ["check1", "check2"],
"RESOURCEID": ["res1", "res2"],
"STATUSEXTENDED": ["", ""],
"REGION": ["us-east-1", "us-east-1"],
"ACCOUNTID": ["123456789", "123456789"],
}
data.update(extra_cols)
return pd.DataFrame(data)
class TestDispatchComplianceRenderer:
def test_builtin_name_uses_builtin_module(self):
"""SC-002a: analytics_input='cis_4_0_aws' resolves real builtin module;
returns (html.Div, DataFrame) 2-tuple."""
data = pd.DataFrame(
{
"REQUIREMENTS_ID": ["1.1", "1.2"],
"REQUIREMENTS_DESCRIPTION": ["Description 1", "Description 2"],
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Section A", "Section A"],
"CHECKID": ["check1", "check2"],
"STATUS": ["PASS", "FAIL"],
"REGION": ["us-east-1", "us-east-1"],
"ACCOUNTID": ["123456789", "123456789"],
"RESOURCEID": ["res1", "res2"],
"STATUSEXTENDED": ["Pass", "Fail"],
}
)
table, result_data = _dispatch_compliance_renderer(data, "cis_4_0_aws")
assert isinstance(table, html.Div)
assert isinstance(result_data, pd.DataFrame)
def test_unknown_name_falls_back_to_generic(self):
"""SC-003a: Unknown analytics_input raises ModuleNotFoundError → generic
fallback is called with the deduped dataframe."""
data = _make_dispatch_df()
sentinel = MagicMock(
return_value=html.Div([], className="compliance-data-layout")
)
with patch("dashboard.compliance.generic.get_table", sentinel):
table, result_data = _dispatch_compliance_renderer(data, "myfw_dynprovider")
sentinel.assert_called_once()
assert isinstance(table, html.Div)
assert isinstance(result_data, pd.DataFrame)
def test_import_error_is_not_swallowed(self):
"""SC-003b: ImportError (NOT ModuleNotFoundError) is re-raised; except clause
is exact — only ModuleNotFoundError routes to generic."""
data = _make_dispatch_df()
with patch(
"dashboard.pages.compliance.importlib.import_module",
side_effect=ImportError("custom error"),
):
with pytest.raises(ImportError, match="custom error"):
_dispatch_compliance_renderer(data, "anything")
def test_get_table_error_in_generic_surfaces(self):
"""SC-004a: ValueError from generic.get_table propagates (not swallowed);
get_table is called OUTSIDE the try block."""
data = _make_dispatch_df()
with patch(
"dashboard.compliance.generic.get_table",
side_effect=ValueError("boom"),
):
with pytest.raises(ValueError, match="boom"):
_dispatch_compliance_renderer(data, "myfw_dynprovider")
def test_get_table_error_in_builtin_surfaces(self):
"""REQ-004 / ADR-1: RuntimeError from a builtin get_table propagates;
proving get_table is called outside the try block."""
data = _make_dispatch_df()
mock_module = MagicMock()
mock_module.get_table.side_effect = RuntimeError("table error")
with patch(
"dashboard.pages.compliance.importlib.import_module",
return_value=mock_module,
):
with pytest.raises(RuntimeError, match="table error"):
_dispatch_compliance_renderer(data, "some_builtin_fw")
def test_dedup_applied_before_get_table(self):
"""ADR-1: Duplicate rows (identical CHECKID/STATUS/RESOURCEID/STATUSEXTENDED)
are dropped; returned data has the deduplicated row count."""
# Row 0 and row 1 are identical in all dedup-key columns; row 2 is unique.
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A", "Sec A", "Sec B"],
"REQUIREMENTS_ID": ["req1", "req1", "req2"],
"STATUS": ["PASS", "PASS", "FAIL"],
"CHECKID": ["check1", "check1", "check2"],
"RESOURCEID": ["res1", "res1", "res2"],
"STATUSEXTENDED": ["", "", ""],
"REGION": ["us-east-1"] * 3,
"ACCOUNTID": ["123"] * 3,
}
)
mock_module = MagicMock()
mock_module.get_table.return_value = html.Div([])
with patch(
"dashboard.pages.compliance.importlib.import_module",
return_value=mock_module,
):
table, result_data = _dispatch_compliance_renderer(data, "some_fw")
assert len(result_data) == 2 # one duplicate removed
def test_muted_column_added_to_dedup_when_present(self):
"""ADR-1 edge case: When MUTED column is present, it is included in the dedup
subset at index 2; rows differing only in MUTED are kept as distinct rows."""
# Both rows share CHECKID/STATUS/RESOURCEID/STATUSEXTENDED but differ in MUTED.
# With MUTED in dedup_columns, both rows are kept (2 rows after dedup).
# Without MUTED in dedup_columns, they would be collapsed to 1 row.
data = pd.DataFrame(
{
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Sec A", "Sec A"],
"REQUIREMENTS_ID": ["req1", "req1"],
"STATUS": ["PASS", "PASS"],
"CHECKID": ["check1", "check1"],
"RESOURCEID": ["res1", "res1"],
"STATUSEXTENDED": ["", ""],
"MUTED": ["True", "False"],
"REGION": ["us-east-1", "us-east-1"],
"ACCOUNTID": ["123", "123"],
}
)
mock_module = MagicMock()
mock_module.get_table.return_value = html.Div([])
with patch(
"dashboard.pages.compliance.importlib.import_module",
return_value=mock_module,
):
table, result_data = _dispatch_compliance_renderer(data, "some_fw")
# MUTED at idx 2 means these two rows have different dedup keys → both kept
assert len(result_data) == 2
def test_returns_table_and_data_tuple(self):
"""ADR-1 interface contract: _dispatch_compliance_renderer returns a
2-tuple (table, deduped_data)."""
data = pd.DataFrame(
{
"REQUIREMENTS_ID": ["1.1", "1.2"],
"REQUIREMENTS_DESCRIPTION": ["Desc 1", "Desc 2"],
"REQUIREMENTS_ATTRIBUTES_SECTION": ["Section A", "Section A"],
"CHECKID": ["check1", "check2"],
"STATUS": ["PASS", "FAIL"],
"REGION": ["us-east-1", "us-east-1"],
"ACCOUNTID": ["123456789", "123456789"],
"RESOURCEID": ["res1", "res2"],
"STATUSEXTENDED": ["", ""],
}
)
result = _dispatch_compliance_renderer(data, "cis_4_0_aws")
assert isinstance(result, tuple)
assert len(result) == 2
table, deduped_data = result
assert isinstance(table, html.Div)
assert isinstance(deduped_data, pd.DataFrame)
+7
View File
@@ -0,0 +1,7 @@
import dash
# Initialize a minimal Dash app so that dashboard page modules can call
# dash.register_page() during import without raising PageError.
# This module-level initialization runs during pytest collection, before
# any test file in this directory is imported.
_test_app = dash.Dash("prowler_test_app", use_pages=True, pages_folder="")
@@ -0,0 +1,60 @@
import pandas as pd
from dashboard.pages.compliance import _ensure_scope_columns
def _df(columns):
"""Build a one-row DataFrame preserving the given column order."""
return pd.DataFrame({col: ["x"] for col in columns})
class TestEnsureScopeColumns:
def test_aws_account_and_region_preserved(self):
"""A provider that already emits ACCOUNTID and REGION is left untouched."""
df = _df(["PROVIDER", "DESCRIPTION", "ACCOUNTID", "REGION", "ASSESSMENTDATE"])
result = _ensure_scope_columns(df)
assert "ACCOUNTID" in result.columns
assert "REGION" in result.columns
assert result["ACCOUNTID"].iloc[0] == "x"
def test_okta_single_scope_column_becomes_accountid(self):
"""Okta's ORGANIZATIONDOMAIN becomes ACCOUNTID; REGION falls back."""
df = _df(["PROVIDER", "DESCRIPTION", "ORGANIZATIONDOMAIN", "ASSESSMENTDATE"])
df["ORGANIZATIONDOMAIN"] = ["trial-123.okta.com"]
result = _ensure_scope_columns(df)
assert "ACCOUNTID" in result.columns
assert "ORGANIZATIONDOMAIN" not in result.columns
assert result["ACCOUNTID"].iloc[0] == "trial-123.okta.com"
assert result["REGION"].iloc[0] == "-"
def test_two_unknown_scope_columns_map_to_account_and_region(self):
"""Two scope columns map positionally to ACCOUNTID and REGION."""
df = _df(["PROVIDER", "DESCRIPTION", "TENANCYID", "LOCATION", "ASSESSMENTDATE"])
df["TENANCYID"] = ["tenant-1"]
df["LOCATION"] = ["eu-west-1"]
result = _ensure_scope_columns(df)
assert result["ACCOUNTID"].iloc[0] == "tenant-1"
assert result["REGION"].iloc[0] == "eu-west-1"
def test_no_scope_columns_fall_back_to_dash(self):
"""No scope columns → both ACCOUNTID and REGION fall back to '-'."""
df = _df(["PROVIDER", "DESCRIPTION", "ASSESSMENTDATE"])
result = _ensure_scope_columns(df)
assert result["ACCOUNTID"].iloc[0] == "-"
assert result["REGION"].iloc[0] == "-"
def test_missing_anchors_still_fall_back_to_dash(self):
"""Without DESCRIPTION/ASSESSMENTDATE anchors, both fall back to '-'."""
df = _df(["PROVIDER", "FOO", "BAR"])
result = _ensure_scope_columns(df)
assert result["ACCOUNTID"].iloc[0] == "-"
assert result["REGION"].iloc[0] == "-"
def test_existing_accountid_does_not_consume_region_scope(self):
"""An existing ACCOUNTID is kept; the leftover scope becomes REGION."""
df = _df(["PROVIDER", "DESCRIPTION", "ACCOUNTID", "LOCATION", "ASSESSMENTDATE"])
df["ACCOUNTID"] = ["acc-1"]
df["LOCATION"] = ["us-east-2"]
result = _ensure_scope_columns(df)
assert result["ACCOUNTID"].iloc[0] == "acc-1"
assert result["REGION"].iloc[0] == "us-east-2"