Compare commits

...

2 Commits

Author SHA1 Message Date
pedrooot
6534494b29 feat(sdk): add universal compliance output modules (CSV, OCSF, table) 2026-03-10 17:38:34 +01:00
pedrooot
0766a4e1dd feat(sdk): add universal compliance schema models and loaders 2026-03-10 17:37:06 +01:00
10 changed files with 4108 additions and 1 deletions

View File

@@ -1,9 +1,10 @@
import json
import os
import sys
from enum import Enum
from typing import Optional, Union
from pydantic.v1 import BaseModel, ValidationError, root_validator
from pydantic.v1 import BaseModel, Field, ValidationError, root_validator
from prowler.lib.check.utils import list_compliance_modules
from prowler.lib.logger import logger
@@ -429,3 +430,491 @@ def load_compliance_framework(
sys.exit(1)
else:
return compliance_framework
# ─── Universal Compliance Schema Models (Phase 1-3) ─────────────────────────
class AttributeMetadata(BaseModel):
"""Schema descriptor for a single attribute field in a universal compliance framework."""
Key: str
Label: Optional[str] = None
Type: str = "str" # str, int, float, list_str, list_dict, bool
Enum: Optional[list] = None
CSV: bool = True
OCSF: bool = True
Required: bool = False
EnumDisplay: Optional[dict] = None # enum_value -> EnumValueDisplay dict
EnumOrder: Optional[list] = None # explicit ordering of enum values
ChartLabel: Optional[str] = None # axis label when used in charts
class SplitByConfig(BaseModel):
"""Column-splitting configuration (e.g. CIS Level 1/Level 2)."""
Field: str
Values: list
class ScoringConfig(BaseModel):
"""Weighted scoring configuration (e.g. ThreatScore)."""
RiskField: str
WeightField: str
class TableLabels(BaseModel):
"""Custom pass/fail labels for console table rendering."""
PassLabel: str = "PASS"
FailLabel: str = "FAIL"
ProviderHeader: str = "Provider"
GroupHeader: Optional[str] = None
StatusHeader: str = "Status"
Title: Optional[str] = None
ResultsTitle: Optional[str] = None
FooterNote: Optional[str] = None
class TableConfig(BaseModel):
"""Declarative rendering instructions for the console compliance table."""
GroupBy: str
SplitBy: Optional[SplitByConfig] = None
Scoring: Optional[ScoringConfig] = None
Labels: Optional[TableLabels] = None
class EnumValueDisplay(BaseModel):
"""Per-enum-value visual metadata for PDF rendering.
Replaces hardcoded DIMENSION_MAPPING, TIPO_ICONS, nivel colors.
"""
Label: Optional[str] = None # "Trazabilidad"
Abbreviation: Optional[str] = None # "T"
Color: Optional[str] = None # "#4286F4"
Icon: Optional[str] = None # emoji
class ChartConfig(BaseModel):
"""Declarative chart description for PDF reports."""
Id: str
Type: str # vertical_bar | horizontal_bar | radar
GroupBy: str # attribute key to group by
Title: Optional[str] = None
XLabel: Optional[str] = None
YLabel: Optional[str] = None
ValueSource: str = "compliance_percent"
ColorMode: str = "by_value" # by_value | fixed | by_group
FixedColor: Optional[str] = None
class ScoringFormula(BaseModel):
"""Weighted scoring formula (e.g. ThreatScore)."""
RiskField: str # "LevelOfRisk"
WeightField: str # "Weight"
RiskBoostFactor: float = 0.25 # rfac = 1 + factor * risk_level
class CriticalRequirementsFilter(BaseModel):
"""Filter for critical requirements section in PDF reports."""
FilterField: str # "LevelOfRisk"
MinValue: Optional[int] = None # 4 (int-based filter)
FilterValue: Optional[str] = None # "alto" (string-based filter)
StatusFilter: str = "FAIL"
Title: Optional[str] = None # "Critical Failed Requirements"
class ReportFilter(BaseModel):
"""Default report filtering for PDF generation."""
OnlyFailed: bool = True
IncludeManual: bool = False
class I18nLabels(BaseModel):
"""Localized labels for PDF report rendering."""
ReportTitle: Optional[str] = None
PageLabel: str = "Page"
PoweredBy: str = "Powered by Prowler"
FrameworkLabel: str = "Framework:"
VersionLabel: str = "Version:"
ProviderLabel: str = "Provider:"
DescriptionLabel: str = "Description:"
ComplianceScoreLabel: str = "Compliance Score by Sections"
RequirementsIndexLabel: str = "Requirements Index"
DetailedFindingsLabel: str = "Detailed Findings"
class PDFConfig(BaseModel):
"""Declarative PDF report configuration.
Drives the API report generator from JSON data instead of hardcoded
Python config. Colors are hex strings (e.g. '#336699').
"""
Language: str = "en"
LogoFilename: Optional[str] = None
PrimaryColor: Optional[str] = None
SecondaryColor: Optional[str] = None
BgColor: Optional[str] = None
Sections: Optional[list] = None
SectionShortNames: Optional[dict] = None
GroupByField: Optional[str] = None
SubGroupByField: Optional[str] = None
SectionTitles: Optional[dict] = None
Charts: Optional[list] = None
Scoring: Optional[ScoringFormula] = None
CriticalFilter: Optional[CriticalRequirementsFilter] = None
Filter: Optional[ReportFilter] = None
Labels: Optional[I18nLabels] = None
class UniversalComplianceRequirement(BaseModel):
"""Universal requirement with flat dict-based attributes."""
Id: str
Description: str
Name: Optional[str] = None
Attributes: dict = Field(default_factory=dict)
Checks: Union[list, dict] = Field(default_factory=list)
Tactics: Optional[list] = None
SubTechniques: Optional[list] = None
Platforms: Optional[list] = None
TechniqueURL: Optional[str] = None
class OutputsConfig(BaseModel):
"""Container for output-related configuration (table, PDF, etc.)."""
class Config:
allow_population_by_field_name = True
Table_Config: Optional[TableConfig] = Field(None, alias="TableConfig")
PDF_Config: Optional[PDFConfig] = Field(None, alias="PDFConfig")
class ComplianceFramework(BaseModel):
"""Universal top-level container for any compliance framework.
Provider may be explicit (single-provider JSON) or derived from Checks
keys when Checks is a dict keyed by provider.
"""
Framework: str
Name: str
Provider: Optional[str] = None
Version: Optional[str] = None
Description: str
Icon: Optional[str] = None
Requirements: list[UniversalComplianceRequirement]
AttributesMetadata: Optional[list[AttributeMetadata]] = None
Outputs: Optional[OutputsConfig] = None
@root_validator(pre=True)
# noqa: F841 - since vulture raises unused variable 'cls'
def migrate_legacy_output_fields(cls, values): # noqa: F841
"""Move top-level TableConfig/PDFConfig into Outputs for backward compat."""
tc = values.pop("TableConfig", None)
pc = values.pop("PDFConfig", None)
if tc is not None or pc is not None:
outputs = values.get("Outputs") or {}
if isinstance(outputs, OutputsConfig):
outputs = outputs.dict()
if tc is not None and "TableConfig" not in outputs:
outputs["TableConfig"] = tc
if pc is not None and "PDFConfig" not in outputs:
outputs["PDFConfig"] = pc
values["Outputs"] = outputs
return values
@root_validator
# noqa: F841 - since vulture raises unused variable 'cls'
def validate_attributes_against_metadata(cls, values): # noqa: F841
"""Validate every Requirement's Attributes dict against AttributesMetadata.
Checks:
- Required keys (Required=True) must be present in each Requirement.
- Enum-constrained keys must have a value within the declared Enum list.
- Basic type validation (int, float, bool) for non-None values.
"""
metadata = values.get("AttributesMetadata")
requirements = values.get("Requirements", [])
if not metadata:
return values
required_keys = {m.Key for m in metadata if m.Required}
valid_keys = {m.Key for m in metadata}
enum_map = {m.Key: m.Enum for m in metadata if m.Enum}
type_map = {m.Key: m.Type for m in metadata}
type_checks = {
"int": int,
"float": (int, float),
"bool": bool,
}
errors = []
for req in requirements:
attrs = req.Attributes
# Required keys
for key in required_keys:
if key not in attrs or attrs[key] is None:
errors.append(
f"Requirement '{req.Id}': missing required attribute '{key}'"
)
# Enum validation
for key, allowed in enum_map.items():
if key in attrs and attrs[key] is not None:
if attrs[key] not in allowed:
errors.append(
f"Requirement '{req.Id}': attribute '{key}' value "
f"'{attrs[key]}' not in {allowed}"
)
# Type validation for non-string types
for key in attrs:
if key not in valid_keys or attrs[key] is None:
continue
expected_type = type_map.get(key, "str")
py_type = type_checks.get(expected_type)
if py_type and not isinstance(attrs[key], py_type):
errors.append(
f"Requirement '{req.Id}': attribute '{key}' expected "
f"type {expected_type}, got {type(attrs[key]).__name__}"
)
if errors:
detail = "\n ".join(errors)
raise ValueError(f"AttributesMetadata validation failed:\n {detail}")
return values
def get_providers(self) -> list:
"""Derive the set of providers this framework supports.
Inspects Checks keys across all requirements. Falls back to the
explicit Provider field for single-provider frameworks.
"""
providers = set()
for req in self.Requirements:
if isinstance(req.Checks, dict):
providers.update(k.lower() for k in req.Checks.keys())
if self.Provider and not providers:
providers.add(self.Provider.lower())
return sorted(providers)
def supports_provider(self, provider: str) -> bool:
"""Return True if this framework has checks for the given provider."""
provider_lower = provider.lower()
for req in self.Requirements:
if isinstance(req.Checks, dict):
if provider_lower in (k.lower() for k in req.Checks.keys()):
return True
elif isinstance(req.Checks, list) and req.Checks:
# List-style checks: rely on explicit Provider field
if self.Provider and self.Provider.lower() == provider_lower:
return True
return False
# ─── Legacy-to-Universal Adapter (Phase 2) ──────────────────────────────────
def _infer_attribute_metadata(legacy: Compliance) -> Optional[list[AttributeMetadata]]:
"""Introspect the first requirement's attribute model to build AttributesMetadata."""
try:
if not legacy.Requirements:
return None
first_req = legacy.Requirements[0]
# MITRE requirements have Tactics at top level, not in Attributes
if isinstance(first_req, Mitre_Requirement):
return None
if not first_req.Attributes:
return None
sample_attr = first_req.Attributes[0]
metadata = []
for field_name, field_obj in sample_attr.__fields__.items():
field_type = field_obj.outer_type_
type_str = "str"
enum_values = None
origin = getattr(field_type, "__origin__", None)
if field_type is int:
type_str = "int"
elif field_type is float:
type_str = "float"
elif field_type is bool:
type_str = "bool"
elif origin is list:
args = getattr(field_type, "__args__", ())
if args and args[0] is dict:
type_str = "list_dict"
else:
type_str = "list_str"
elif isinstance(field_type, type) and issubclass(field_type, Enum):
type_str = "str"
enum_values = [e.value for e in field_type]
metadata.append(
AttributeMetadata(
Key=field_name,
Type=type_str,
Enum=enum_values,
Required=field_obj.required,
)
)
return metadata
except Exception:
return None
def adapt_legacy_to_universal(legacy: Compliance) -> ComplianceFramework:
"""Convert a legacy Compliance object to a ComplianceFramework."""
universal_requirements = []
for req in legacy.Requirements:
if isinstance(req, Mitre_Requirement):
# For MITRE, promote special fields and store raw attributes
raw_attrs = [attr.dict() for attr in req.Attributes]
attrs = {"_raw_attributes": raw_attrs}
universal_requirements.append(
UniversalComplianceRequirement(
Id=req.Id,
Description=req.Description,
Name=req.Name,
Attributes=attrs,
Checks=req.Checks,
Tactics=req.Tactics,
SubTechniques=req.SubTechniques,
Platforms=req.Platforms,
TechniqueURL=req.TechniqueURL,
)
)
else:
# Standard requirement: flatten first attribute to dict
if req.Attributes:
attrs = req.Attributes[0].dict()
else:
attrs = {}
universal_requirements.append(
UniversalComplianceRequirement(
Id=req.Id,
Description=req.Description,
Name=req.Name,
Attributes=attrs,
Checks=req.Checks,
)
)
inferred_metadata = _infer_attribute_metadata(legacy)
return ComplianceFramework(
Framework=legacy.Framework,
Name=legacy.Name,
Provider=legacy.Provider,
Version=legacy.Version,
Description=legacy.Description,
Requirements=universal_requirements,
AttributesMetadata=inferred_metadata,
)
def load_compliance_framework_universal(path: str) -> ComplianceFramework:
"""Load a compliance JSON as a ComplianceFramework, handling both new and legacy formats."""
try:
with open(path, "r") as f:
data = json.load(f)
if "AttributesMetadata" in data:
# New universal format — parse directly
return ComplianceFramework(**data)
else:
# Legacy format — parse as Compliance, then adapt
legacy = Compliance(**data)
return adapt_legacy_to_universal(legacy)
except Exception as e:
logger.error(
f"Failed to load universal compliance framework from {path}: "
f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}"
)
return None
def _load_jsons_from_dir(dir_path: str, provider: str, bulk: dict) -> None:
"""Scan *dir_path* for JSON files and add matching frameworks to *bulk*."""
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
if not (
os.path.isfile(file_path)
and filename.endswith(".json")
and os.stat(file_path).st_size > 0
):
continue
framework_name = filename.split(".json")[0]
if framework_name in bulk:
continue
fw = load_compliance_framework_universal(file_path)
if fw is None:
continue
if fw.Provider and fw.Provider.lower() == provider.lower():
bulk[framework_name] = fw
elif fw.supports_provider(provider):
bulk[framework_name] = fw
def get_bulk_compliance_frameworks_universal(provider: str) -> dict:
"""Bulk load all compliance frameworks relevant to the given provider.
Scans:
1. The **top-level** ``prowler/compliance/`` directory for multi-provider
JSONs (``Checks`` keyed by provider, no ``Provider`` field).
2. Every **provider sub-directory** (``prowler/compliance/{p}/``) so that
single-provider JSONs are also picked up.
A framework is included when its explicit ``Provider`` matches
(case-insensitive) **or** any requirement has dict-style ``Checks``
with a key for *provider*.
"""
bulk = {}
try:
available_modules = list_compliance_modules()
# Resolve the compliance root once (parent of provider sub-dirs).
compliance_root = None
seen_paths = set()
for module in available_modules:
dir_path = f"{module.module_finder.path}/{module.name.split('.')[-1]}"
if not os.path.isdir(dir_path) or dir_path in seen_paths:
continue
seen_paths.add(dir_path)
# Remember the root the first time we see a valid sub-dir.
if compliance_root is None:
compliance_root = module.module_finder.path
_load_jsons_from_dir(dir_path, provider, bulk)
# Also scan top-level compliance/ for provider-agnostic JSONs.
if compliance_root and os.path.isdir(compliance_root):
_load_jsons_from_dir(compliance_root, provider, bulk)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
return bulk

View File

@@ -0,0 +1,401 @@
import os
from datetime import datetime
from typing import List
from py_ocsf_models.events.base_event import SeverityID
from py_ocsf_models.events.base_event import StatusID as EventStatusID
from py_ocsf_models.events.findings.compliance_finding import ComplianceFinding
from py_ocsf_models.events.findings.compliance_finding_type_id import (
ComplianceFindingTypeID,
)
from py_ocsf_models.events.findings.finding import ActivityID, FindingInformation
from py_ocsf_models.objects.check import Check
from py_ocsf_models.objects.compliance import Compliance
from py_ocsf_models.objects.compliance_status import StatusID as ComplianceStatusID
from py_ocsf_models.objects.group import Group
from py_ocsf_models.objects.metadata import Metadata
from py_ocsf_models.objects.product import Product
from py_ocsf_models.objects.resource_details import ResourceDetails
from prowler.config.config import prowler_version
from prowler.lib.check.compliance_models import ComplianceFramework
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.ocsf.ocsf import OCSF
from prowler.lib.outputs.utils import unroll_dict_to_list
from prowler.lib.utils.utils import open_file
PROWLER_TO_COMPLIANCE_STATUS = {
"PASS": ComplianceStatusID.Pass,
"FAIL": ComplianceStatusID.Fail,
"MANUAL": ComplianceStatusID.Unknown,
}
def _to_snake_case(name: str) -> str:
"""Convert a PascalCase or camelCase string to snake_case."""
import re
# Insert underscore before uppercase letters preceded by lowercase
s = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
# Insert underscore between consecutive uppercase and following lowercase
s = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", s)
return s.lower()
def _build_requirement_attrs(requirement, framework) -> dict:
"""Build a dict with requirement attributes for the unmapped section.
Keys are normalized to snake_case for OCSF consistency.
Only includes attributes whose AttributeMetadata has OCSF=True.
When no metadata is declared, all attributes are included.
"""
attrs = requirement.Attributes
if not attrs:
return {}
# Build set of keys allowed for OCSF output
metadata = framework.AttributesMetadata
if metadata:
ocsf_keys = {m.Key for m in metadata if m.OCSF}
else:
ocsf_keys = None # No metadata → include all
result = {}
for key, value in attrs.items():
if ocsf_keys is not None and key not in ocsf_keys:
continue
result[_to_snake_case(key)] = value
return result
class OCSFComplianceOutput:
"""Produces OCSF ComplianceFinding (class_uid 2003) events from
universal compliance framework data.
Each finding × requirement combination produces one ComplianceFinding event
with structured Compliance and Check objects.
"""
def __init__(
self,
findings: list,
framework: ComplianceFramework,
file_path: str = None,
from_cli: bool = True,
provider: str = None,
) -> None:
self._data = []
self._file_descriptor = None
self.file_path = file_path
self._from_cli = from_cli
self._provider = provider
self.close_file = False
if findings:
compliance_name = (
framework.Framework + "-" + framework.Version
if framework.Version
else framework.Framework
)
self._transform(findings, framework, compliance_name)
if not self._file_descriptor and file_path:
self._create_file_descriptor(file_path)
@property
def data(self):
return self._data
def _transform(
self,
findings: List[Finding],
framework: ComplianceFramework,
compliance_name: str,
) -> None:
# Build check -> requirements map (same logic as UniversalComplianceOutput)
check_req_map = {}
for req in framework.Requirements:
checks = req.Checks
if isinstance(checks, dict):
if self._provider:
all_checks = checks.get(self._provider.lower(), [])
else:
all_checks = []
for check_list in checks.values():
all_checks.extend(check_list)
else:
all_checks = checks
for check_id in all_checks:
check_req_map.setdefault(check_id, []).append(req)
for finding in findings:
if finding.check_id in check_req_map:
for req in check_req_map[finding.check_id]:
cf = self._build_compliance_finding(
finding, framework, req, compliance_name
)
if cf:
self._data.append(cf)
# Manual requirements (no checks or empty for current provider)
for req in framework.Requirements:
checks = req.Checks
if isinstance(checks, dict):
if self._provider:
has_checks = bool(checks.get(self._provider.lower(), []))
else:
has_checks = any(checks.values())
else:
has_checks = bool(checks)
if not has_checks:
cf = self._build_manual_compliance_finding(
framework, req, compliance_name
)
if cf:
self._data.append(cf)
def _build_unmapped(self, finding, requirement, framework) -> dict:
"""Build the unmapped dict with cloud info and requirement attributes."""
unmapped = {}
# Cloud info (from finding, when available)
if finding and getattr(finding, "provider", None) != "kubernetes":
unmapped["cloud"] = {
"provider": finding.provider,
"region": finding.region,
"account": {
"uid": finding.account_uid,
"name": finding.account_name,
},
"org": {
"uid": finding.account_organization_uid,
"name": finding.account_organization_name,
},
}
# Requirement attributes
req_attrs = _build_requirement_attrs(requirement, framework)
if req_attrs:
unmapped["requirement_attributes"] = req_attrs
return unmapped or None
def _build_compliance_finding(
self,
finding: Finding,
framework: ComplianceFramework,
requirement,
compliance_name: str,
) -> ComplianceFinding:
try:
compliance_status = PROWLER_TO_COMPLIANCE_STATUS.get(
finding.status, ComplianceStatusID.Unknown
)
check_status = PROWLER_TO_COMPLIANCE_STATUS.get(
finding.status, ComplianceStatusID.Unknown
)
finding_severity = getattr(
SeverityID,
finding.metadata.Severity.capitalize(),
SeverityID.Unknown,
)
event_status = OCSF.get_finding_status_id(finding.muted)
time_value = (
int(finding.timestamp.timestamp())
if isinstance(finding.timestamp, datetime)
else finding.timestamp
)
cf = ComplianceFinding(
activity_id=ActivityID.Create.value,
activity_name=ActivityID.Create.name,
compliance=Compliance(
standards=[compliance_name],
requirements=[requirement.Id],
control=requirement.Description,
status_id=compliance_status,
checks=[
Check(
uid=finding.check_id,
name=finding.metadata.CheckTitle,
desc=finding.metadata.Description,
status=finding.status,
status_id=check_status,
)
],
),
finding_info=FindingInformation(
uid=f"{finding.uid}-{requirement.Id}",
title=requirement.Id,
desc=requirement.Description,
created_time=time_value,
created_time_dt=(
finding.timestamp
if isinstance(finding.timestamp, datetime)
else None
),
),
message=finding.status_extended,
metadata=Metadata(
event_code=finding.check_id,
product=Product(
uid="prowler",
name="Prowler",
vendor_name="Prowler",
version=finding.prowler_version,
),
profiles=(
["cloud", "datetime"]
if finding.provider != "kubernetes"
else ["container", "datetime"]
),
tenant_uid=finding.account_organization_uid,
),
resources=[
ResourceDetails(
labels=unroll_dict_to_list(finding.resource_tags),
name=finding.resource_name,
uid=finding.resource_uid,
group=Group(name=finding.metadata.ServiceName),
type=finding.metadata.ResourceType,
cloud_partition=(
finding.partition
if finding.provider != "kubernetes"
else None
),
region=(
finding.region if finding.provider != "kubernetes" else None
),
namespace=(
finding.region.replace("namespace: ", "")
if finding.provider == "kubernetes"
else None
),
data={
"details": finding.resource_details,
"metadata": finding.resource_metadata,
},
)
],
severity_id=finding_severity.value,
severity=finding_severity.name,
status_id=event_status.value,
status=event_status.name,
status_code=finding.status,
status_detail=finding.status_extended,
time=time_value,
time_dt=(
finding.timestamp
if isinstance(finding.timestamp, datetime)
else None
),
type_uid=ComplianceFindingTypeID.Create,
type_name=f"Compliance Finding: {ComplianceFindingTypeID.Create.name}",
unmapped=self._build_unmapped(finding, requirement, framework),
)
return cf
except Exception as e:
logger.debug(f"Skipping OCSF compliance finding for {requirement.Id}: {e}")
return None
def _build_manual_compliance_finding(
self,
framework: ComplianceFramework,
requirement,
compliance_name: str,
) -> ComplianceFinding:
try:
from prowler.config.config import timestamp as config_timestamp
time_value = int(config_timestamp.timestamp())
return ComplianceFinding(
activity_id=ActivityID.Create.value,
activity_name=ActivityID.Create.name,
compliance=Compliance(
standards=[compliance_name],
requirements=[requirement.Id],
control=requirement.Description,
status_id=ComplianceStatusID.Unknown,
),
finding_info=FindingInformation(
uid=f"manual-{requirement.Id}",
title=requirement.Id,
desc=requirement.Description,
created_time=time_value,
),
message="Manual check",
metadata=Metadata(
event_code="manual",
product=Product(
uid="prowler",
name="Prowler",
vendor_name="Prowler",
version=prowler_version,
),
),
severity_id=SeverityID.Informational.value,
severity=SeverityID.Informational.name,
status_id=EventStatusID.New.value,
status=EventStatusID.New.name,
status_code="MANUAL",
status_detail="Manual check",
time=time_value,
type_uid=ComplianceFindingTypeID.Create,
type_name=f"Compliance Finding: {ComplianceFindingTypeID.Create.name}",
unmapped=self._build_unmapped(None, requirement, framework),
)
except Exception as e:
logger.debug(
f"Skipping manual OCSF compliance finding for {requirement.Id}: {e}"
)
return None
def _create_file_descriptor(self, file_path: str) -> None:
try:
self._file_descriptor = open_file(file_path, "a")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def batch_write_data_to_file(self) -> None:
"""Write ComplianceFinding events to a JSON array file."""
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
if self._file_descriptor.tell() == 0:
self._file_descriptor.write("[")
for finding in self._data:
try:
if hasattr(finding, "model_dump_json"):
json_output = finding.model_dump_json(
exclude_none=True, indent=4
)
else:
json_output = finding.json(exclude_none=True, indent=4)
self._file_descriptor.write(json_output)
self._file_descriptor.write(",")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if self.close_file or self._from_cli:
if self._file_descriptor.tell() != 1:
self._file_descriptor.seek(
self._file_descriptor.tell() - 1, os.SEEK_SET
)
self._file_descriptor.truncate()
self._file_descriptor.write("]")
self._file_descriptor.close()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

View File

@@ -0,0 +1,298 @@
from csv import DictWriter
from pathlib import Path
from typing import Optional
from pydantic.v1 import create_model
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import ComplianceFramework
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.utils.utils import open_file
PROVIDER_HEADER_MAP = {
"aws": ("AccountId", "account_uid", "Region", "region"),
"azure": ("SubscriptionId", "account_uid", "Location", "region"),
"gcp": ("ProjectId", "account_uid", "Location", "region"),
"kubernetes": ("Context", "account_name", "Namespace", "region"),
"m365": ("TenantId", "account_uid", "Location", "region"),
"github": ("Account_Name", "account_name", "Account_Id", "account_uid"),
"oraclecloud": ("TenancyId", "account_uid", "Region", "region"),
"alibabacloud": ("AccountId", "account_uid", "Region", "region"),
"nhn": ("AccountId", "account_uid", "Region", "region"),
}
_DEFAULT_HEADERS = ("AccountId", "account_uid", "Region", "region")
class UniversalComplianceOutput:
"""Universal compliance CSV output driven by ComplianceFramework metadata.
Dynamically builds a Pydantic row model from AttributesMetadata so that
CSV columns match the framework's declared attribute fields.
"""
def __init__(
self,
findings: list,
framework: ComplianceFramework,
file_path: str = None,
from_cli: bool = True,
provider: str = None,
) -> None:
self._data = []
self._file_descriptor = None
self.file_path = file_path
self._from_cli = from_cli
self._provider = provider
self.close_file = False
if file_path:
path_obj = Path(file_path)
self._file_extension = path_obj.suffix if path_obj.suffix else ""
if findings:
self._row_model = self._build_row_model(framework)
compliance_name = (
framework.Framework + "-" + framework.Version
if framework.Version
else framework.Framework
)
self._transform(findings, framework, compliance_name)
if not self._file_descriptor and file_path:
self._create_file_descriptor(file_path)
@property
def data(self):
return self._data
def _build_row_model(self, framework: ComplianceFramework):
"""Build a dynamic Pydantic model from AttributesMetadata."""
acct_header, acct_field, loc_header, loc_field = PROVIDER_HEADER_MAP.get(
(self._provider or "").lower(), _DEFAULT_HEADERS
)
self._acct_header = acct_header
self._acct_field = acct_field
self._loc_header = loc_header
self._loc_field = loc_field
# Base fields present in every compliance CSV
fields = {
"Provider": (str, ...),
"Description": (str, ...),
acct_header: (str, ...),
loc_header: (str, ...),
"AssessmentDate": (str, ...),
"Requirements_Id": (str, ...),
"Requirements_Description": (str, ...),
}
# Dynamic attribute columns from metadata
if framework.AttributesMetadata:
for attr_meta in framework.AttributesMetadata:
if not attr_meta.CSV:
continue
field_name = f"Requirements_Attributes_{attr_meta.Key}"
# Map type strings to Python types
type_map = {
"str": Optional[str],
"int": Optional[int],
"float": Optional[float],
"bool": Optional[bool],
"list_str": Optional[str], # Serialized as joined string
"list_dict": Optional[str], # Serialized as string
}
py_type = type_map.get(attr_meta.Type, Optional[str])
fields[field_name] = (py_type, None)
# Check if any requirement has MITRE fields
has_mitre = any(req.Tactics for req in framework.Requirements if req.Tactics)
if has_mitre:
fields["Requirements_Tactics"] = (Optional[str], None)
fields["Requirements_SubTechniques"] = (Optional[str], None)
fields["Requirements_Platforms"] = (Optional[str], None)
fields["Requirements_TechniqueURL"] = (Optional[str], None)
# Trailing fields
fields["Status"] = (str, ...)
fields["StatusExtended"] = (str, ...)
fields["ResourceId"] = (str, ...)
fields["ResourceName"] = (str, ...)
fields["CheckId"] = (str, ...)
fields["Muted"] = (bool, ...)
fields["Framework"] = (str, ...)
fields["Name"] = (str, ...)
return create_model("UniversalComplianceRow", **fields)
def _serialize_attr_value(self, value):
"""Serialize attribute values for CSV."""
if isinstance(value, list):
if value and isinstance(value[0], dict):
return str(value)
return " | ".join(str(v) for v in value)
return value
def _build_row(self, finding, framework, requirement, is_manual=False):
"""Build a single row dict for a finding + requirement combination."""
row = {
"Provider": (
finding.provider
if not is_manual
else (framework.Provider or self._provider or "").lower()
),
"Description": framework.Description,
self._acct_header: (
getattr(finding, self._acct_field, "") if not is_manual else ""
),
self._loc_header: (
getattr(finding, self._loc_field, "") if not is_manual else ""
),
"AssessmentDate": str(timestamp),
"Requirements_Id": requirement.Id,
"Requirements_Description": requirement.Description,
}
# Add dynamic attribute columns
if framework.AttributesMetadata:
for attr_meta in framework.AttributesMetadata:
if not attr_meta.CSV:
continue
field_name = f"Requirements_Attributes_{attr_meta.Key}"
raw_val = requirement.Attributes.get(attr_meta.Key)
row[field_name] = (
self._serialize_attr_value(raw_val) if raw_val is not None else None
)
# MITRE fields
if requirement.Tactics:
row["Requirements_Tactics"] = (
" | ".join(requirement.Tactics) if requirement.Tactics else None
)
row["Requirements_SubTechniques"] = (
" | ".join(requirement.SubTechniques)
if requirement.SubTechniques
else None
)
row["Requirements_Platforms"] = (
" | ".join(requirement.Platforms) if requirement.Platforms else None
)
row["Requirements_TechniqueURL"] = requirement.TechniqueURL
row["Status"] = finding.status if not is_manual else "MANUAL"
row["StatusExtended"] = (
finding.status_extended if not is_manual else "Manual check"
)
row["ResourceId"] = finding.resource_uid if not is_manual else "manual_check"
row["ResourceName"] = finding.resource_name if not is_manual else "Manual check"
row["CheckId"] = finding.check_id if not is_manual else "manual"
row["Muted"] = finding.muted if not is_manual else False
row["Framework"] = framework.Framework
row["Name"] = framework.Name
return row
def _transform(
self,
findings: list[Finding],
framework: ComplianceFramework,
compliance_name: str,
) -> None:
"""Transform findings into universal compliance CSV rows."""
# Build check -> requirements map (filtered by provider for dict Checks)
check_req_map = {}
for req in framework.Requirements:
checks = req.Checks
if isinstance(checks, dict):
if self._provider:
all_checks = checks.get(self._provider.lower(), [])
else:
all_checks = []
for check_list in checks.values():
all_checks.extend(check_list)
else:
all_checks = checks
for check_id in all_checks:
if check_id not in check_req_map:
check_req_map[check_id] = []
check_req_map[check_id].append(req)
# Process findings using the provider-filtered check_req_map.
# This ensures that for multi-provider dict Checks, only the checks
# belonging to the current provider produce output rows.
for finding in findings:
check_id = finding.check_id
if check_id in check_req_map:
for req in check_req_map[check_id]:
row = self._build_row(finding, framework, req)
try:
self._data.append(self._row_model(**row))
except Exception as e:
logger.debug(f"Skipping row for {req.Id}: {e}")
# Manual requirements (no checks or empty dict)
for req in framework.Requirements:
checks = req.Checks
if isinstance(checks, dict):
if self._provider:
has_checks = bool(checks.get(self._provider.lower(), []))
else:
has_checks = any(checks.values())
else:
has_checks = bool(checks)
if not has_checks:
# Use a dummy finding-like namespace for manual rows
row = self._build_row(
_ManualFindingStub(), framework, req, is_manual=True
)
try:
self._data.append(self._row_model(**row))
except Exception as e:
logger.debug(f"Skipping manual row for {req.Id}: {e}")
def _create_file_descriptor(self, file_path: str) -> None:
try:
self._file_descriptor = open_file(file_path, "a")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def batch_write_data_to_file(self) -> None:
"""Write findings data to CSV."""
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
csv_writer = DictWriter(
self._file_descriptor,
fieldnames=[field.upper() for field in self._data[0].dict().keys()],
delimiter=";",
)
if self._file_descriptor.tell() == 0:
csv_writer.writeheader()
for row in self._data:
csv_writer.writerow({k.upper(): v for k, v in row.dict().items()})
if self.close_file or self._from_cli:
self._file_descriptor.close()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class _ManualFindingStub:
"""Minimal stub to satisfy _build_row for manual requirements."""
provider = ""
account_uid = ""
account_name = ""
region = ""
status = "MANUAL"
status_extended = "Manual check"
resource_uid = "manual_check"
resource_name = "Manual check"
check_id = "manual"
muted = False

View File

@@ -0,0 +1,494 @@
from colorama import Fore, Style
from tabulate import tabulate
from prowler.config.config import orange_color
from prowler.lib.check.compliance_models import ComplianceFramework
def get_universal_table(
findings: list,
bulk_checks_metadata: dict,
compliance_framework_name: str,
output_filename: str,
output_directory: str,
compliance_overview: bool,
framework: ComplianceFramework = None,
provider: str = None,
output_formats: list = None,
) -> None:
"""Render a compliance console table driven by TableConfig.
Supports 3 modes:
- Grouped: GroupBy only (generic, C5, CSA, ISO, KISA)
- Split: GroupBy + SplitBy (CIS Level 1/2, ENS alto/medio/bajo/opcional)
- Scored: GroupBy + Scoring (ThreatScore weighted risk %)
When ``provider`` is given and ``Checks`` is a multi-provider dict,
only the checks for that provider are matched against findings.
"""
if framework is None or not framework.Outputs or not framework.Outputs.Table_Config:
return
tc = framework.Outputs.Table_Config
labels = tc.Labels or _default_labels()
group_by = tc.GroupBy
split_by = tc.SplitBy
scoring = tc.Scoring
if scoring:
_render_scored(
findings,
bulk_checks_metadata,
compliance_framework_name,
output_filename,
output_directory,
compliance_overview,
framework,
group_by,
scoring,
labels,
provider,
output_formats=output_formats,
)
elif split_by:
_render_split(
findings,
bulk_checks_metadata,
compliance_framework_name,
output_filename,
output_directory,
compliance_overview,
framework,
group_by,
split_by,
labels,
provider,
output_formats=output_formats,
)
else:
_render_grouped(
findings,
bulk_checks_metadata,
compliance_framework_name,
output_filename,
output_directory,
compliance_overview,
framework,
group_by,
labels,
provider,
output_formats=output_formats,
)
def _default_labels():
"""Return a simple namespace with default labels."""
from prowler.lib.check.compliance_models import TableLabels
return TableLabels()
def _build_requirement_check_map(framework, provider=None):
"""Build a map of check_id -> list of requirements for fast lookup.
When *provider* is given and a requirement's ``Checks`` is a dict keyed
by provider, only the checks for that provider are included.
"""
check_map = {}
for req in framework.Requirements:
checks = req.Checks
if isinstance(checks, dict):
if provider:
all_checks = checks.get(provider.lower(), [])
else:
all_checks = []
for check_list in checks.values():
all_checks.extend(check_list)
else:
all_checks = checks
for check_id in all_checks:
if check_id not in check_map:
check_map[check_id] = []
check_map[check_id].append(req)
return check_map
def _get_group_key(req, group_by):
"""Extract the group key from a requirement."""
if group_by == "_Tactics":
return req.Tactics or []
return [req.Attributes.get(group_by, "Unknown")]
def _print_overview(pass_count, fail_count, muted_count, framework_name, labels):
"""Print the overview pass/fail/muted summary."""
total = len(fail_count) + len(pass_count) + len(muted_count)
if total < 2:
return False
title = (
labels.Title
or f"Compliance Status of {Fore.YELLOW}{framework_name.upper()}{Style.RESET_ALL} Framework:"
)
print(f"\n{title}")
fail_pct = round(len(fail_count) / total * 100, 2)
pass_pct = round(len(pass_count) / total * 100, 2)
muted_pct = round(len(muted_count) / total * 100, 2)
fail_label = labels.FailLabel
pass_label = labels.PassLabel
overview_table = [
[
f"{Fore.RED}{fail_pct}% ({len(fail_count)}) {fail_label}{Style.RESET_ALL}",
f"{Fore.GREEN}{pass_pct}% ({len(pass_count)}) {pass_label}{Style.RESET_ALL}",
f"{orange_color}{muted_pct}% ({len(muted_count)}) MUTED{Style.RESET_ALL}",
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
return True
def _render_grouped(
findings,
bulk_checks_metadata,
compliance_framework_name,
output_filename,
output_directory,
compliance_overview,
framework,
group_by,
labels,
provider=None,
output_formats=None,
):
"""Grouped mode: one row per group with pass/fail counts."""
check_map = _build_requirement_check_map(framework, provider)
groups = {}
pass_count = []
fail_count = []
muted_count = []
for index, finding in enumerate(findings):
check_id = finding.check_metadata.CheckID
if check_id not in check_map:
continue
for req in check_map[check_id]:
for group_key in _get_group_key(req, group_by):
if group_key not in groups:
groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0}
if finding.muted:
if index not in muted_count:
muted_count.append(index)
groups[group_key]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
fail_count.append(index)
groups[group_key]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
groups[group_key]["PASS"] += 1
if not _print_overview(
pass_count, fail_count, muted_count, compliance_framework_name, labels
):
return
if not compliance_overview:
provider_header = labels.ProviderHeader
group_header = labels.GroupHeader or group_by
table = {
provider_header: [],
group_header: [],
labels.StatusHeader: [],
"Muted": [],
}
for group_key in sorted(groups):
table[provider_header].append(
framework.Provider or (provider.upper() if provider else "")
)
table[group_header].append(group_key)
if groups[group_key]["FAIL"] > 0:
table[labels.StatusHeader].append(
f"{Fore.RED}{labels.FailLabel}({groups[group_key]['FAIL']}){Style.RESET_ALL}"
)
else:
table[labels.StatusHeader].append(
f"{Fore.GREEN}{labels.PassLabel}({groups[group_key]['PASS']}){Style.RESET_ALL}"
)
table["Muted"].append(
f"{orange_color}{groups[group_key]['Muted']}{Style.RESET_ALL}"
)
results_title = (
labels.ResultsTitle
or f"Framework {Fore.YELLOW}{compliance_framework_name.upper()}{Style.RESET_ALL} Results:"
)
print(f"\n{results_title}")
print(tabulate(table, headers="keys", tablefmt="rounded_grid"))
footer = labels.FooterNote or "* Only sections containing results appear."
print(f"{Style.BRIGHT}{footer}{Style.RESET_ALL}")
print(f"\nDetailed results of {compliance_framework_name.upper()} are in:")
print(
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.csv"
)
if "json-ocsf" in (output_formats or []):
print(
f" - OCSF: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.ocsf.json"
)
print()
def _render_split(
findings,
bulk_checks_metadata,
compliance_framework_name,
output_filename,
output_directory,
compliance_overview,
framework,
group_by,
split_by,
labels,
provider=None,
output_formats=None,
):
"""Split mode: one row per group with columns for each split value (e.g. Level 1/Level 2)."""
check_map = _build_requirement_check_map(framework, provider)
split_field = split_by.Field
split_values = split_by.Values
groups = {}
pass_count = []
fail_count = []
muted_count = []
for index, finding in enumerate(findings):
check_id = finding.check_metadata.CheckID
if check_id not in check_map:
continue
for req in check_map[check_id]:
for group_key in _get_group_key(req, group_by):
if group_key not in groups:
groups[group_key] = {
sv: {"FAIL": 0, "PASS": 0} for sv in split_values
}
groups[group_key]["Muted"] = 0
split_val = req.Attributes.get(split_field, "")
if finding.muted:
if index not in muted_count:
muted_count.append(index)
groups[group_key]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
fail_count.append(index)
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
for sv in split_values:
if sv in str(split_val):
if not finding.muted:
if finding.status == "FAIL":
groups[group_key][sv]["FAIL"] += 1
else:
groups[group_key][sv]["PASS"] += 1
if not _print_overview(
pass_count, fail_count, muted_count, compliance_framework_name, labels
):
return
if not compliance_overview:
provider_header = labels.ProviderHeader
group_header = labels.GroupHeader or group_by
table = {provider_header: [], group_header: []}
for sv in split_values:
table[sv] = []
table["Muted"] = []
for group_key in sorted(groups):
table[provider_header].append(
framework.Provider or (provider.upper() if provider else "")
)
table[group_header].append(group_key)
for sv in split_values:
if groups[group_key][sv]["FAIL"] > 0:
table[sv].append(
f"{Fore.RED}{labels.FailLabel}({groups[group_key][sv]['FAIL']}){Style.RESET_ALL}"
)
else:
table[sv].append(
f"{Fore.GREEN}{labels.PassLabel}({groups[group_key][sv]['PASS']}){Style.RESET_ALL}"
)
table["Muted"].append(
f"{orange_color}{groups[group_key]['Muted']}{Style.RESET_ALL}"
)
results_title = (
labels.ResultsTitle
or f"Framework {Fore.YELLOW}{compliance_framework_name.upper()}{Style.RESET_ALL} Results:"
)
print(f"\n{results_title}")
print(tabulate(table, headers="keys", tablefmt="rounded_grid"))
footer = labels.FooterNote or "* Only sections containing results appear."
print(f"{Style.BRIGHT}{footer}{Style.RESET_ALL}")
print(f"\nDetailed results of {compliance_framework_name.upper()} are in:")
print(
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.csv"
)
if "json-ocsf" in (output_formats or []):
print(
f" - OCSF: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.ocsf.json"
)
print()
def _render_scored(
findings,
bulk_checks_metadata,
compliance_framework_name,
output_filename,
output_directory,
compliance_overview,
framework,
group_by,
scoring,
labels,
provider=None,
output_formats=None,
):
"""Scored mode: weighted risk scoring per group (e.g. ThreatScore)."""
check_map = _build_requirement_check_map(framework, provider)
risk_field = scoring.RiskField
weight_field = scoring.WeightField
groups = {}
pass_count = []
fail_count = []
muted_count = []
score_per_group = {}
max_score_per_group = {}
counted_per_group = {}
generic_score = 0
max_generic_score = 0
counted_generic = []
for index, finding in enumerate(findings):
check_id = finding.check_metadata.CheckID
if check_id not in check_map:
continue
for req in check_map[check_id]:
for group_key in _get_group_key(req, group_by):
attrs = req.Attributes
risk = attrs.get(risk_field, 0)
weight = attrs.get(weight_field, 0)
if group_key not in groups:
groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0}
score_per_group[group_key] = 0
max_score_per_group[group_key] = 0
counted_per_group[group_key] = []
if index not in counted_per_group[group_key] and not finding.muted:
if finding.status == "PASS":
score_per_group[group_key] += risk * weight
max_score_per_group[group_key] += risk * weight
counted_per_group[group_key].append(index)
if finding.muted:
if index not in muted_count:
muted_count.append(index)
groups[group_key]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
fail_count.append(index)
groups[group_key]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
groups[group_key]["PASS"] += 1
if index not in counted_generic and not finding.muted:
if finding.status == "PASS":
generic_score += risk * weight
max_generic_score += risk * weight
counted_generic.append(index)
if not _print_overview(
pass_count, fail_count, muted_count, compliance_framework_name, labels
):
return
if not compliance_overview:
provider_header = labels.ProviderHeader
group_header = labels.GroupHeader or group_by
table = {
provider_header: [],
group_header: [],
labels.StatusHeader: [],
"Score": [],
"Muted": [],
}
for group_key in sorted(groups):
table[provider_header].append(
framework.Provider or (provider.upper() if provider else "")
)
table[group_header].append(group_key)
if max_score_per_group[group_key] == 0:
group_score = 100.0
score_color = Fore.GREEN
else:
group_score = (
score_per_group[group_key] / max_score_per_group[group_key]
) * 100
score_color = Fore.RED
table["Score"].append(
f"{Style.BRIGHT}{score_color}{group_score:.2f}%{Style.RESET_ALL}"
)
if groups[group_key]["FAIL"] > 0:
table[labels.StatusHeader].append(
f"{Fore.RED}{labels.FailLabel}({groups[group_key]['FAIL']}){Style.RESET_ALL}"
)
else:
table[labels.StatusHeader].append(
f"{Fore.GREEN}{labels.PassLabel}({groups[group_key]['PASS']}){Style.RESET_ALL}"
)
table["Muted"].append(
f"{orange_color}{groups[group_key]['Muted']}{Style.RESET_ALL}"
)
if max_generic_score == 0:
generic_threat_score = 100.0
else:
generic_threat_score = generic_score / max_generic_score * 100
results_title = (
labels.ResultsTitle
or f"Framework {Fore.YELLOW}{compliance_framework_name.upper()}{Style.RESET_ALL} Results:"
)
print(f"\n{results_title}")
print(f"\nGeneric Threat Score: {generic_threat_score:.2f}%")
print(tabulate(table, headers="keys", tablefmt="rounded_grid"))
footer = labels.FooterNote or (
f"{Style.BRIGHT}\n=== Threat Score Guide ===\n"
f"The lower the score, the higher the risk.{Style.RESET_ALL}\n"
f"{Style.BRIGHT}(Only sections containing results appear, the score is calculated as the sum of the "
f"level of risk * weight of the passed findings divided by the sum of the risk * weight of all the findings){Style.RESET_ALL}"
)
print(footer)
print(f"\nDetailed results of {compliance_framework_name.upper()} are in:")
print(
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.csv"
)
if "json-ocsf" in (output_formats or []):
print(
f" - OCSF: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.ocsf.json"
)
print()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,448 @@
import json
from datetime import datetime, timezone
from types import SimpleNamespace
from py_ocsf_models.events.findings.compliance_finding import ComplianceFinding
from py_ocsf_models.events.findings.compliance_finding_type_id import (
ComplianceFindingTypeID,
)
from py_ocsf_models.objects.compliance_status import StatusID as ComplianceStatusID
from prowler.lib.check.compliance_models import (
AttributeMetadata,
ComplianceFramework,
OutputsConfig,
TableConfig,
UniversalComplianceRequirement,
)
from prowler.lib.outputs.compliance.universal.ocsf_compliance import (
OCSFComplianceOutput,
)
def _make_finding(check_id, status="PASS", provider="aws"):
"""Create a mock Finding with all fields needed by OCSFComplianceOutput."""
finding = SimpleNamespace()
finding.provider = provider
finding.account_uid = "123456789012"
finding.account_name = "test-account"
finding.account_email = ""
finding.account_organization_uid = "org-123"
finding.account_organization_name = "test-org"
finding.account_tags = {"env": "test"}
finding.region = "us-east-1"
finding.status = status
finding.status_extended = f"{check_id} is {status}"
finding.resource_uid = f"arn:aws:iam::123456789012:{check_id}"
finding.resource_name = check_id
finding.resource_details = "some details"
finding.resource_metadata = {}
finding.resource_tags = {"Name": "test"}
finding.partition = "aws"
finding.muted = False
finding.check_id = check_id
finding.uid = "test-finding-uid"
finding.timestamp = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
finding.prowler_version = "5.0.0"
finding.compliance = {}
finding.metadata = SimpleNamespace(
Provider=provider,
CheckID=check_id,
CheckTitle=f"Title for {check_id}",
CheckType=["test-type"],
Description=f"Description for {check_id}",
Severity="medium",
ServiceName="iam",
ResourceType="aws-iam-role",
Risk="test-risk",
RelatedUrl="https://example.com",
Remediation=SimpleNamespace(
Recommendation=SimpleNamespace(Text="Fix it", Url="https://fix.com"),
),
DependsOn=[],
RelatedTo=[],
Categories=["test"],
Notes="",
AdditionalURLs=[],
)
return finding
def _make_framework(requirements, attrs_metadata=None):
return ComplianceFramework(
Framework="TestFW",
Name="Test Framework",
Provider="AWS",
Version="1.0",
Description="Test framework",
Requirements=requirements,
AttributesMetadata=attrs_metadata,
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
)
def _simple_requirement(req_id="REQ-1", checks=None):
return UniversalComplianceRequirement(
Id=req_id,
Description=f"Description for {req_id}",
Attributes={},
Checks=checks if checks is not None else ["check_a"],
)
class TestOCSFComplianceOutput:
def test_transform_basic(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert len(output.data) == 1
assert isinstance(output.data[0], ComplianceFinding)
def test_class_uid(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert output.data[0].class_uid == 2003
def test_type_uid(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert output.data[0].type_uid == ComplianceFindingTypeID.Create
assert output.data[0].type_uid == 200301
def test_compliance_object_fields(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
compliance = output.data[0].compliance
assert compliance.standards == ["TestFW-1.0"]
assert compliance.requirements == ["REQ-1"]
assert compliance.control == "Description for REQ-1"
assert compliance.status_id == ComplianceStatusID.Pass
def test_check_object_fields(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", "FAIL")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
checks = output.data[0].compliance.checks
assert len(checks) == 1
assert checks[0].uid == "check_a"
assert checks[0].name == "Title for check_a"
assert checks[0].status == "FAIL"
assert checks[0].status_id == ComplianceStatusID.Fail
def test_finding_info_fields(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
info = output.data[0].finding_info
assert info.uid == "test-finding-uid-REQ-1"
assert info.title == "REQ-1"
assert info.desc == "Description for REQ-1"
def test_metadata_fields(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
metadata = output.data[0].metadata
assert metadata.product.name == "Prowler"
assert metadata.product.uid == "prowler"
assert metadata.event_code == "check_a"
def test_status_mapping_pass(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert output.data[0].compliance.status_id == ComplianceStatusID.Pass
def test_status_mapping_fail(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", "FAIL")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert output.data[0].compliance.status_id == ComplianceStatusID.Fail
def test_manual_requirement(self):
req = _simple_requirement("MANUAL-1", checks=[])
fw = _make_framework([req])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert len(output.data) == 1
cf = output.data[0]
assert cf.compliance.status_id == ComplianceStatusID.Unknown
assert cf.status_code == "MANUAL"
assert cf.finding_info.uid == "manual-MANUAL-1"
def test_multi_provider_checks_dict(self):
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Multi-provider req",
Attributes={},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
)
fw = _make_framework([req])
findings = [_make_finding("check_a", "PASS", provider="aws")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert len(output.data) == 1
assert output.data[0].compliance.checks[0].uid == "check_a"
def test_empty_findings(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
output = OCSFComplianceOutput(findings=[], framework=fw, provider="aws")
assert output.data == []
def test_cloud_info_in_unmapped(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", provider="aws")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
cf = output.data[0]
assert cf.unmapped is not None
assert cf.unmapped["cloud"]["provider"] == "aws"
assert cf.unmapped["cloud"]["account"]["uid"] == "123456789012"
assert cf.unmapped["cloud"]["account"]["name"] == "test-account"
def test_resources_populated(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
resources = output.data[0].resources
assert len(resources) == 1
assert resources[0].name == "check_a"
assert resources[0].uid == "arn:aws:iam::123456789012:check_a"
assert resources[0].type == "aws-iam-role"
def test_batch_write_to_file(self, tmp_path):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [_make_finding("check_a", "PASS")]
filepath = str(tmp_path / "compliance.ocsf.json")
output = OCSFComplianceOutput(
findings=findings, framework=fw, file_path=filepath, provider="aws"
)
output.batch_write_data_to_file()
with open(filepath) as f:
data = json.load(f)
assert isinstance(data, list)
assert len(data) == 1
assert data[0]["class_uid"] == 2003
assert data[0]["compliance"]["standards"] == ["TestFW-1.0"]
assert data[0]["compliance"]["requirements"] == ["REQ-1"]
def test_multiple_findings_same_requirement(self):
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
findings = [
_make_finding("check_a", "PASS"),
_make_finding("check_a", "FAIL"),
]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert len(output.data) == 2
statuses = [cf.compliance.status_id for cf in output.data]
assert ComplianceStatusID.Pass in statuses
assert ComplianceStatusID.Fail in statuses
def test_requirement_attributes_in_unmapped(self):
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Test requirement",
Attributes={"Section": "IAM", "Profile": "Level 1"},
Checks=["check_a"],
)
fw = _make_framework([req])
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
cf = output.data[0]
assert cf.unmapped is not None
assert "requirement_attributes" in cf.unmapped
assert cf.unmapped["requirement_attributes"]["section"] == "IAM"
assert cf.unmapped["requirement_attributes"]["profile"] == "Level 1"
def test_requirement_attributes_keys_are_snake_case(self):
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Test requirement",
Attributes={"Section": "IAM", "CCMLite": "Yes", "SubSection": "1.1"},
Checks=["check_a"],
)
fw = _make_framework([req])
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
attrs = output.data[0].unmapped["requirement_attributes"]
assert "section" in attrs
assert "ccm_lite" in attrs
assert "sub_section" in attrs
def test_requirement_attributes_empty_attrs_excluded(self):
req = _simple_requirement("REQ-1", checks=["check_a"])
fw = _make_framework([req])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
cf = output.data[0]
# Cloud info is still present, but no requirement_attributes key
assert cf.unmapped is not None
assert "cloud" in cf.unmapped
assert "requirement_attributes" not in cf.unmapped
def test_manual_requirement_has_attributes_in_unmapped(self):
req = UniversalComplianceRequirement(
Id="MANUAL-1",
Description="Manual check",
Attributes={"Section": "Logging", "Type": "manual"},
Checks=[],
)
fw = _make_framework([req])
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
assert len(output.data) == 1
cf = output.data[0]
assert cf.unmapped is not None
assert cf.unmapped["requirement_attributes"]["section"] == "Logging"
assert cf.unmapped["requirement_attributes"]["type"] == "manual"
# Manual findings have no cloud info (finding is None)
assert "cloud" not in cf.unmapped
def test_ocsf_metadata_filters_attributes(self):
"""Attributes with OCSF=False in metadata should be excluded from unmapped."""
metadata = [
AttributeMetadata(Key="Section", Type="str", OCSF=True),
AttributeMetadata(Key="InternalNote", Type="str", OCSF=False),
AttributeMetadata(Key="Profile", Type="str", OCSF=True),
]
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Test",
Attributes={
"Section": "IAM",
"InternalNote": "skip me",
"Profile": "Level 1",
},
Checks=["check_a"],
)
fw = _make_framework([req], attrs_metadata=metadata)
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
attrs = output.data[0].unmapped["requirement_attributes"]
assert "section" in attrs
assert "profile" in attrs
assert "internal_note" not in attrs
def test_ocsf_metadata_all_false_excludes_all(self):
"""When all attributes have OCSF=False, requirement_attributes should be empty."""
metadata = [
AttributeMetadata(Key="Section", Type="str", OCSF=False),
]
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Test",
Attributes={"Section": "IAM"},
Checks=["check_a"],
)
fw = _make_framework([req], attrs_metadata=metadata)
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
cf = output.data[0]
assert cf.unmapped is not None
# requirement_attributes should not appear since all attrs are filtered out
assert "requirement_attributes" not in cf.unmapped
def test_ocsf_no_metadata_includes_all(self):
"""Without AttributesMetadata, all attributes should be included (backward compat)."""
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Test",
Attributes={"Section": "IAM", "Custom": "value"},
Checks=["check_a"],
)
fw = _make_framework([req], attrs_metadata=None)
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
attrs = output.data[0].unmapped["requirement_attributes"]
assert "section" in attrs
assert "custom" in attrs
def test_ocsf_default_is_true(self):
"""OCSF defaults to True — attributes are included unless explicitly excluded."""
metadata = [
AttributeMetadata(Key="Section", Type="str"),
AttributeMetadata(Key="Profile", Type="str"),
]
req = UniversalComplianceRequirement(
Id="REQ-1",
Description="Test",
Attributes={"Section": "IAM", "Profile": "Level 1"},
Checks=["check_a"],
)
fw = _make_framework([req], attrs_metadata=metadata)
findings = [_make_finding("check_a", "PASS")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
attrs = output.data[0].unmapped["requirement_attributes"]
assert "section" in attrs
assert "profile" in attrs
def test_ocsf_filter_on_manual_requirements(self):
"""OCSF filtering should also apply to manual requirements."""
metadata = [
AttributeMetadata(Key="Section", Type="str", OCSF=True),
AttributeMetadata(Key="InternalNote", Type="str", OCSF=False),
]
req = UniversalComplianceRequirement(
Id="MANUAL-1",
Description="Manual",
Attributes={"Section": "Logging", "InternalNote": "hidden"},
Checks=[],
)
fw = _make_framework([req], attrs_metadata=metadata)
findings = [_make_finding("check_a")]
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
cf = output.data[0]
assert cf.unmapped["requirement_attributes"]["section"] == "Logging"
assert "internal_note" not in cf.unmapped["requirement_attributes"]

View File

@@ -0,0 +1,551 @@
from types import SimpleNamespace
from prowler.lib.check.compliance_models import (
AttributeMetadata,
ComplianceFramework,
OutputsConfig,
TableConfig,
UniversalComplianceRequirement,
)
from prowler.lib.outputs.compliance.universal.universal_output import (
UniversalComplianceOutput,
)
def _make_finding(check_id, status="PASS", compliance_map=None):
"""Create a mock Finding for output tests."""
finding = SimpleNamespace()
finding.provider = "aws"
finding.account_uid = "123456789012"
finding.account_name = "test-account"
finding.region = "us-east-1"
finding.status = status
finding.status_extended = f"{check_id} is {status}"
finding.resource_uid = f"arn:aws:iam::123456789012:{check_id}"
finding.resource_name = check_id
finding.muted = False
finding.check_id = check_id
finding.metadata = SimpleNamespace(
Provider="aws",
CheckID=check_id,
Severity="medium",
)
finding.compliance = compliance_map or {}
return finding
def _make_framework(requirements, attrs_metadata=None, table_config=None):
return ComplianceFramework(
Framework="TestFW",
Name="Test Framework",
Provider="AWS",
Version="1.0",
Description="Test framework",
Requirements=requirements,
AttributesMetadata=attrs_metadata,
Outputs=OutputsConfig(Table_Config=table_config) if table_config else None,
)
class TestDynamicCSVColumns:
def test_columns_match_metadata(self, tmp_path):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM", "SubSection": "Auth"},
Checks=["check_a"],
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
AttributeMetadata(Key="SubSection", Type="str"),
]
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
findings = [
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
)
assert len(output.data) == 1
row_dict = output.data[0].dict()
assert "Requirements_Attributes_Section" in row_dict
assert "Requirements_Attributes_SubSection" in row_dict
assert row_dict["Requirements_Attributes_Section"] == "IAM"
assert row_dict["Requirements_Attributes_SubSection"] == "Auth"
class TestManualRequirements:
def test_manual_status(self, tmp_path):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=["check_a"],
),
UniversalComplianceRequirement(
Id="manual-1",
Description="manual check",
Attributes={"Section": "Governance"},
Checks=[],
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
]
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
findings = [
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
)
# Should have 1 real finding + 1 manual
assert len(output.data) == 2
manual_rows = [r for r in output.data if r.dict()["Status"] == "MANUAL"]
assert len(manual_rows) == 1
assert manual_rows[0].dict()["Requirements_Id"] == "manual-1"
assert manual_rows[0].dict()["ResourceId"] == "manual_check"
class TestMITREExtraColumns:
def test_mitre_columns_present(self, tmp_path):
reqs = [
UniversalComplianceRequirement(
Id="T1190",
Description="Exploit",
Attributes={},
Checks=["check_a"],
Tactics=["Initial Access"],
SubTechniques=[],
Platforms=["IaaS"],
TechniqueURL="https://attack.mitre.org/techniques/T1190/",
),
]
fw = _make_framework(reqs, None, TableConfig(GroupBy="_Tactics"))
findings = [
_make_finding("check_a", "PASS", {"TestFW-1.0": ["T1190"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
)
assert len(output.data) == 1
row_dict = output.data[0].dict()
assert "Requirements_Tactics" in row_dict
assert row_dict["Requirements_Tactics"] == "Initial Access"
assert "Requirements_TechniqueURL" in row_dict
class TestCSVFileWrite:
def test_batch_write(self, tmp_path):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=["check_a"],
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
]
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
findings = [
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
)
output.batch_write_data_to_file()
# Verify file was created and has content
with open(filepath, "r") as f:
content = f.read()
assert "PROVIDER" in content # Headers are uppercase
assert "REQUIREMENTS_ATTRIBUTES_SECTION" in content
assert "IAM" in content
class TestNoFindings:
def test_empty_findings_no_data(self, tmp_path):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=["check_a"],
),
]
fw = _make_framework(reqs, None, TableConfig(GroupBy="Section"))
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=[],
framework=fw,
file_path=filepath,
)
assert len(output.data) == 0
class TestMultiProviderOutput:
def test_dict_checks_filtered_by_provider(self, tmp_path):
"""Only checks for the given provider appear in CSV output."""
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
]
fw = ComplianceFramework(
Framework="MultiCloud",
Name="Multi",
Version="1.0",
Description="Test multi-provider",
Requirements=reqs,
AttributesMetadata=metadata,
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
)
findings = [
_make_finding("check_a", "PASS", {"MultiCloud-1.0": ["1.1"]}),
_make_finding("check_b", "FAIL", {"MultiCloud-1.0": ["1.1"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
provider="aws",
)
# Only check_a should match (it's the AWS check)
assert len(output.data) == 1
row_dict = output.data[0].dict()
assert row_dict["Requirements_Attributes_Section"] == "IAM"
def test_no_provider_includes_all(self, tmp_path):
"""Without provider filter, all checks from all providers are included."""
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
]
fw = ComplianceFramework(
Framework="MultiCloud",
Name="Multi",
Version="1.0",
Description="Test multi-provider",
Requirements=reqs,
AttributesMetadata=metadata,
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
)
findings = [
_make_finding("check_a", "PASS", {"MultiCloud-1.0": ["1.1"]}),
_make_finding("check_b", "FAIL", {"MultiCloud-1.0": ["1.1"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
)
# Both checks should be included without provider filter
assert len(output.data) == 2
def test_empty_dict_checks_is_manual(self, tmp_path):
"""Requirement with empty dict Checks is treated as manual."""
reqs = [
UniversalComplianceRequirement(
Id="manual-1",
Description="manual check",
Attributes={"Section": "Governance"},
Checks={},
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
]
fw = ComplianceFramework(
Framework="MultiCloud",
Name="Multi",
Version="1.0",
Description="Test",
Requirements=reqs,
AttributesMetadata=metadata,
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
)
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=[_make_finding("other_check", "PASS", {})],
framework=fw,
file_path=filepath,
provider="aws",
)
manual_rows = [r for r in output.data if r.dict()["Status"] == "MANUAL"]
assert len(manual_rows) == 1
assert manual_rows[0].dict()["Requirements_Id"] == "manual-1"
class TestCSVExclude:
def test_csv_false_excludes_column(self, tmp_path):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM", "Internal": "hidden"},
Checks=["check_a"],
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str", CSV=True),
AttributeMetadata(Key="Internal", Type="str", CSV=False),
]
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
findings = [
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
)
row_dict = output.data[0].dict()
assert "Requirements_Attributes_Section" in row_dict
assert "Requirements_Attributes_Internal" not in row_dict
def _make_provider_finding(provider, check_id="check_a", status="PASS"):
"""Create a mock Finding with a specific provider."""
finding = _make_finding(check_id, status, {"TestFW-1.0": ["1.1"]})
finding.provider = provider
return finding
def _simple_framework():
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=["check_a"],
),
]
metadata = [
AttributeMetadata(Key="Section", Type="str"),
]
return _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
class TestProviderHeaders:
def test_aws_headers(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("aws")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
provider="aws",
)
row_dict = output.data[0].dict()
assert "AccountId" in row_dict
assert "Region" in row_dict
assert row_dict["AccountId"] == "123456789012"
assert row_dict["Region"] == "us-east-1"
def test_azure_headers(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("azure")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
provider="azure",
)
row_dict = output.data[0].dict()
assert "SubscriptionId" in row_dict
assert "Location" in row_dict
assert row_dict["SubscriptionId"] == "123456789012"
assert row_dict["Location"] == "us-east-1"
def test_gcp_headers(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("gcp")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
provider="gcp",
)
row_dict = output.data[0].dict()
assert "ProjectId" in row_dict
assert "Location" in row_dict
assert row_dict["ProjectId"] == "123456789012"
def test_kubernetes_headers(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("kubernetes")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
provider="kubernetes",
)
row_dict = output.data[0].dict()
assert "Context" in row_dict
assert "Namespace" in row_dict
# Kubernetes Context maps to account_name
assert row_dict["Context"] == "test-account"
assert row_dict["Namespace"] == "us-east-1"
def test_github_headers(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("github")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
provider="github",
)
row_dict = output.data[0].dict()
assert "Account_Name" in row_dict
assert "Account_Id" in row_dict
# GitHub: Account_Name (pos 3) from account_name, Account_Id (pos 4) from account_uid
assert row_dict["Account_Name"] == "test-account"
assert row_dict["Account_Id"] == "123456789012"
# Verify column order matches legacy (Account_Name before Account_Id)
keys = list(row_dict.keys())
assert keys.index("Account_Name") < keys.index("Account_Id")
def test_unknown_provider_defaults(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("unknown")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
provider="unknown",
)
row_dict = output.data[0].dict()
assert "AccountId" in row_dict
assert "Region" in row_dict
def test_none_provider_defaults(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("aws")]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / "test.csv"),
)
row_dict = output.data[0].dict()
assert "AccountId" in row_dict
assert "Region" in row_dict
def test_csv_write_azure_headers(self, tmp_path):
fw = _simple_framework()
findings = [_make_provider_finding("azure")]
filepath = str(tmp_path / "test.csv")
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=filepath,
provider="azure",
)
output.batch_write_data_to_file()
with open(filepath, "r") as f:
content = f.read()
assert "SUBSCRIPTIONID" in content
assert "LOCATION" in content
# Should NOT have the default AccountId/Region headers
assert "ACCOUNTID" not in content
def test_column_order_matches_legacy(self, tmp_path):
"""Verify that the base column order matches the legacy per-provider models.
Legacy models all define: Provider, Description, <col3>, <col4>, AssessmentDate, ...
The universal output must preserve this exact order for backward compatibility.
"""
# Expected column order per provider (positions 3 and 4 after Provider, Description)
legacy_order = {
"aws": ("AccountId", "Region"),
"azure": ("SubscriptionId", "Location"),
"gcp": ("ProjectId", "Location"),
"kubernetes": ("Context", "Namespace"),
"m365": ("TenantId", "Location"),
"github": ("Account_Name", "Account_Id"),
"oraclecloud": ("TenancyId", "Region"),
"alibabacloud": ("AccountId", "Region"),
"nhn": ("AccountId", "Region"),
}
for provider_name, (expected_col3, expected_col4) in legacy_order.items():
fw = _simple_framework()
findings = [_make_provider_finding(provider_name)]
output = UniversalComplianceOutput(
findings=findings,
framework=fw,
file_path=str(tmp_path / f"test_{provider_name}.csv"),
provider=provider_name,
)
keys = list(output.data[0].dict().keys())
assert keys[0] == "Provider", f"{provider_name}: col 1 should be Provider"
assert (
keys[1] == "Description"
), f"{provider_name}: col 2 should be Description"
assert (
keys[2] == expected_col3
), f"{provider_name}: col 3 should be {expected_col3}, got {keys[2]}"
assert (
keys[3] == expected_col4
), f"{provider_name}: col 4 should be {expected_col4}, got {keys[3]}"
assert (
keys[4] == "AssessmentDate"
), f"{provider_name}: col 5 should be AssessmentDate"

View File

@@ -0,0 +1,384 @@
from types import SimpleNamespace
from unittest.mock import MagicMock
from prowler.lib.check.compliance_models import (
ComplianceFramework,
OutputsConfig,
ScoringConfig,
SplitByConfig,
TableConfig,
TableLabels,
UniversalComplianceRequirement,
)
from prowler.lib.outputs.compliance.universal.universal_table import (
_build_requirement_check_map,
_get_group_key,
get_universal_table,
)
def _make_finding(check_id, status="PASS", muted=False):
"""Create a mock finding for table tests."""
finding = SimpleNamespace()
finding.check_metadata = SimpleNamespace(CheckID=check_id)
finding.status = status
finding.muted = muted
return finding
def _make_framework(requirements, table_config, provider="AWS"):
return ComplianceFramework(
Framework="TestFW",
Name="Test Framework",
Provider=provider,
Version="1.0",
Description="Test",
Requirements=requirements,
Outputs=OutputsConfig(Table_Config=table_config) if table_config else None,
)
class TestBuildRequirementCheckMap:
def test_basic(self):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=["check_a", "check_b"],
),
UniversalComplianceRequirement(
Id="1.2",
Description="test2",
Attributes={"Section": "IAM"},
Checks=["check_b", "check_c"],
),
]
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
check_map = _build_requirement_check_map(fw)
assert "check_a" in check_map
assert len(check_map["check_b"]) == 2
assert "check_c" in check_map
def test_dict_checks_no_provider_filter(self):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
),
]
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
check_map = _build_requirement_check_map(fw)
assert "check_a" in check_map
assert "check_b" in check_map
def test_dict_checks_filtered_by_provider(self):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
),
]
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
check_map = _build_requirement_check_map(fw, provider="aws")
assert "check_a" in check_map
assert "check_b" not in check_map
def test_dict_checks_provider_not_present(self):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
),
]
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
check_map = _build_requirement_check_map(fw, provider="gcp")
assert len(check_map) == 0
class TestGetGroupKey:
def test_normal_field(self):
req = UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=[],
)
assert _get_group_key(req, "Section") == ["IAM"]
def test_tactics(self):
req = UniversalComplianceRequirement(
Id="T1190",
Description="test",
Attributes={},
Checks=[],
Tactics=["Initial Access", "Execution"],
)
assert _get_group_key(req, "_Tactics") == ["Initial Access", "Execution"]
class TestGroupedMode:
def test_grouped_rendering(self, capsys):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks=["check_a"],
),
UniversalComplianceRequirement(
Id="2.1",
Description="test2",
Attributes={"Section": "Logging"},
Checks=["check_b"],
),
]
tc = TableConfig(GroupBy="Section")
fw = _make_framework(reqs, tc)
findings = [
_make_finding("check_a", "PASS"),
_make_finding("check_b", "FAIL"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
"/tmp",
False,
framework=fw,
)
captured = capsys.readouterr()
assert "IAM" in captured.out
assert "Logging" in captured.out
assert "PASS" in captured.out
assert "FAIL" in captured.out
class TestSplitMode:
def test_split_rendering(self, capsys):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "Storage", "Profile": "Level 1"},
Checks=["check_a"],
),
UniversalComplianceRequirement(
Id="1.2",
Description="test2",
Attributes={"Section": "Storage", "Profile": "Level 2"},
Checks=["check_b"],
),
]
tc = TableConfig(
GroupBy="Section",
SplitBy=SplitByConfig(Field="Profile", Values=["Level 1", "Level 2"]),
)
fw = _make_framework(reqs, tc)
findings = [
_make_finding("check_a", "PASS"),
_make_finding("check_b", "FAIL"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
"/tmp",
False,
framework=fw,
)
captured = capsys.readouterr()
assert "Storage" in captured.out
assert "Level 1" in captured.out
assert "Level 2" in captured.out
class TestScoredMode:
def test_scored_rendering(self, capsys):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM", "LevelOfRisk": 5, "Weight": 100},
Checks=["check_a"],
),
UniversalComplianceRequirement(
Id="1.2",
Description="test2",
Attributes={"Section": "IAM", "LevelOfRisk": 3, "Weight": 50},
Checks=["check_b"],
),
]
tc = TableConfig(
GroupBy="Section",
Scoring=ScoringConfig(RiskField="LevelOfRisk", WeightField="Weight"),
)
fw = _make_framework(reqs, tc)
findings = [
_make_finding("check_a", "PASS"),
_make_finding("check_b", "FAIL"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
"/tmp",
False,
framework=fw,
)
captured = capsys.readouterr()
assert "IAM" in captured.out
assert "Score" in captured.out
assert "Threat Score" in captured.out
class TestCustomLabels:
def test_ens_spanish_labels(self, capsys):
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Marco": "operacional"},
Checks=["check_a"],
),
UniversalComplianceRequirement(
Id="1.2",
Description="test2",
Attributes={"Marco": "organizativo"},
Checks=["check_b"],
),
]
tc = TableConfig(
GroupBy="Marco",
Labels=TableLabels(
PassLabel="CUMPLE",
FailLabel="NO CUMPLE",
ProviderHeader="Proveedor",
Title="Estado de Cumplimiento",
),
)
fw = _make_framework(reqs, tc)
findings = [_make_finding("check_a", "PASS"), _make_finding("check_b", "FAIL")]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"test_fw",
"output",
"/tmp",
False,
framework=fw,
)
captured = capsys.readouterr()
assert "CUMPLE" in captured.out
assert "Estado de Cumplimiento" in captured.out
class TestMultiProviderDictChecks:
def test_only_aws_checks_matched(self, capsys):
"""With dict Checks and provider='aws', only AWS checks match findings."""
reqs = [
UniversalComplianceRequirement(
Id="1.1",
Description="test",
Attributes={"Section": "IAM"},
Checks={"aws": ["check_a"], "azure": ["check_b"]},
),
UniversalComplianceRequirement(
Id="2.1",
Description="test2",
Attributes={"Section": "Logging"},
Checks={"aws": ["check_c"], "gcp": ["check_d"]},
),
]
tc = TableConfig(GroupBy="Section")
fw = ComplianceFramework(
Framework="MultiCloud",
Name="Multi",
Description="Test",
Requirements=reqs,
Outputs=OutputsConfig(Table_Config=tc),
)
findings = [
_make_finding("check_a", "PASS"),
_make_finding("check_b", "FAIL"), # Azure check, should be ignored
_make_finding("check_c", "PASS"),
]
bulk_metadata = {
"check_a": MagicMock(Compliance=[]),
"check_b": MagicMock(Compliance=[]),
"check_c": MagicMock(Compliance=[]),
}
get_universal_table(
findings,
bulk_metadata,
"multi_cloud",
"output",
"/tmp",
False,
framework=fw,
provider="aws",
)
captured = capsys.readouterr()
assert "IAM" in captured.out
assert "Logging" in captured.out
# check_b (azure) should not have been counted as FAIL for AWS
assert "PASS" in captured.out
class TestNoTableConfig:
def test_returns_early_without_table_config(self, capsys):
fw = ComplianceFramework(
Framework="TestFW",
Name="Test",
Provider="AWS",
Description="Test",
Requirements=[],
)
get_universal_table([], {}, "test", "out", "/tmp", False, framework=fw)
captured = capsys.readouterr()
assert captured.out == ""
def test_returns_early_without_framework(self, capsys):
get_universal_table([], {}, "test", "out", "/tmp", False, framework=None)
captured = capsys.readouterr()
assert captured.out == ""