mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-22 03:08:23 +00:00
Compare commits
2 Commits
master
...
feat/prowl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6534494b29 | ||
|
|
0766a4e1dd |
@@ -1,9 +1,10 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from enum import Enum
|
||||
from typing import Optional, Union
|
||||
|
||||
from pydantic.v1 import BaseModel, ValidationError, root_validator
|
||||
from pydantic.v1 import BaseModel, Field, ValidationError, root_validator
|
||||
|
||||
from prowler.lib.check.utils import list_compliance_modules
|
||||
from prowler.lib.logger import logger
|
||||
@@ -429,3 +430,491 @@ def load_compliance_framework(
|
||||
sys.exit(1)
|
||||
else:
|
||||
return compliance_framework
|
||||
|
||||
|
||||
# ─── Universal Compliance Schema Models (Phase 1-3) ─────────────────────────
|
||||
|
||||
|
||||
class AttributeMetadata(BaseModel):
|
||||
"""Schema descriptor for a single attribute field in a universal compliance framework."""
|
||||
|
||||
Key: str
|
||||
Label: Optional[str] = None
|
||||
Type: str = "str" # str, int, float, list_str, list_dict, bool
|
||||
Enum: Optional[list] = None
|
||||
CSV: bool = True
|
||||
OCSF: bool = True
|
||||
Required: bool = False
|
||||
EnumDisplay: Optional[dict] = None # enum_value -> EnumValueDisplay dict
|
||||
EnumOrder: Optional[list] = None # explicit ordering of enum values
|
||||
ChartLabel: Optional[str] = None # axis label when used in charts
|
||||
|
||||
|
||||
class SplitByConfig(BaseModel):
|
||||
"""Column-splitting configuration (e.g. CIS Level 1/Level 2)."""
|
||||
|
||||
Field: str
|
||||
Values: list
|
||||
|
||||
|
||||
class ScoringConfig(BaseModel):
|
||||
"""Weighted scoring configuration (e.g. ThreatScore)."""
|
||||
|
||||
RiskField: str
|
||||
WeightField: str
|
||||
|
||||
|
||||
class TableLabels(BaseModel):
|
||||
"""Custom pass/fail labels for console table rendering."""
|
||||
|
||||
PassLabel: str = "PASS"
|
||||
FailLabel: str = "FAIL"
|
||||
ProviderHeader: str = "Provider"
|
||||
GroupHeader: Optional[str] = None
|
||||
StatusHeader: str = "Status"
|
||||
Title: Optional[str] = None
|
||||
ResultsTitle: Optional[str] = None
|
||||
FooterNote: Optional[str] = None
|
||||
|
||||
|
||||
class TableConfig(BaseModel):
|
||||
"""Declarative rendering instructions for the console compliance table."""
|
||||
|
||||
GroupBy: str
|
||||
SplitBy: Optional[SplitByConfig] = None
|
||||
Scoring: Optional[ScoringConfig] = None
|
||||
Labels: Optional[TableLabels] = None
|
||||
|
||||
|
||||
class EnumValueDisplay(BaseModel):
|
||||
"""Per-enum-value visual metadata for PDF rendering.
|
||||
|
||||
Replaces hardcoded DIMENSION_MAPPING, TIPO_ICONS, nivel colors.
|
||||
"""
|
||||
|
||||
Label: Optional[str] = None # "Trazabilidad"
|
||||
Abbreviation: Optional[str] = None # "T"
|
||||
Color: Optional[str] = None # "#4286F4"
|
||||
Icon: Optional[str] = None # emoji
|
||||
|
||||
|
||||
class ChartConfig(BaseModel):
|
||||
"""Declarative chart description for PDF reports."""
|
||||
|
||||
Id: str
|
||||
Type: str # vertical_bar | horizontal_bar | radar
|
||||
GroupBy: str # attribute key to group by
|
||||
Title: Optional[str] = None
|
||||
XLabel: Optional[str] = None
|
||||
YLabel: Optional[str] = None
|
||||
ValueSource: str = "compliance_percent"
|
||||
ColorMode: str = "by_value" # by_value | fixed | by_group
|
||||
FixedColor: Optional[str] = None
|
||||
|
||||
|
||||
class ScoringFormula(BaseModel):
|
||||
"""Weighted scoring formula (e.g. ThreatScore)."""
|
||||
|
||||
RiskField: str # "LevelOfRisk"
|
||||
WeightField: str # "Weight"
|
||||
RiskBoostFactor: float = 0.25 # rfac = 1 + factor * risk_level
|
||||
|
||||
|
||||
class CriticalRequirementsFilter(BaseModel):
|
||||
"""Filter for critical requirements section in PDF reports."""
|
||||
|
||||
FilterField: str # "LevelOfRisk"
|
||||
MinValue: Optional[int] = None # 4 (int-based filter)
|
||||
FilterValue: Optional[str] = None # "alto" (string-based filter)
|
||||
StatusFilter: str = "FAIL"
|
||||
Title: Optional[str] = None # "Critical Failed Requirements"
|
||||
|
||||
|
||||
class ReportFilter(BaseModel):
|
||||
"""Default report filtering for PDF generation."""
|
||||
|
||||
OnlyFailed: bool = True
|
||||
IncludeManual: bool = False
|
||||
|
||||
|
||||
class I18nLabels(BaseModel):
|
||||
"""Localized labels for PDF report rendering."""
|
||||
|
||||
ReportTitle: Optional[str] = None
|
||||
PageLabel: str = "Page"
|
||||
PoweredBy: str = "Powered by Prowler"
|
||||
FrameworkLabel: str = "Framework:"
|
||||
VersionLabel: str = "Version:"
|
||||
ProviderLabel: str = "Provider:"
|
||||
DescriptionLabel: str = "Description:"
|
||||
ComplianceScoreLabel: str = "Compliance Score by Sections"
|
||||
RequirementsIndexLabel: str = "Requirements Index"
|
||||
DetailedFindingsLabel: str = "Detailed Findings"
|
||||
|
||||
|
||||
class PDFConfig(BaseModel):
|
||||
"""Declarative PDF report configuration.
|
||||
|
||||
Drives the API report generator from JSON data instead of hardcoded
|
||||
Python config. Colors are hex strings (e.g. '#336699').
|
||||
"""
|
||||
|
||||
Language: str = "en"
|
||||
LogoFilename: Optional[str] = None
|
||||
PrimaryColor: Optional[str] = None
|
||||
SecondaryColor: Optional[str] = None
|
||||
BgColor: Optional[str] = None
|
||||
Sections: Optional[list] = None
|
||||
SectionShortNames: Optional[dict] = None
|
||||
GroupByField: Optional[str] = None
|
||||
SubGroupByField: Optional[str] = None
|
||||
SectionTitles: Optional[dict] = None
|
||||
Charts: Optional[list] = None
|
||||
Scoring: Optional[ScoringFormula] = None
|
||||
CriticalFilter: Optional[CriticalRequirementsFilter] = None
|
||||
Filter: Optional[ReportFilter] = None
|
||||
Labels: Optional[I18nLabels] = None
|
||||
|
||||
|
||||
class UniversalComplianceRequirement(BaseModel):
|
||||
"""Universal requirement with flat dict-based attributes."""
|
||||
|
||||
Id: str
|
||||
Description: str
|
||||
Name: Optional[str] = None
|
||||
Attributes: dict = Field(default_factory=dict)
|
||||
Checks: Union[list, dict] = Field(default_factory=list)
|
||||
Tactics: Optional[list] = None
|
||||
SubTechniques: Optional[list] = None
|
||||
Platforms: Optional[list] = None
|
||||
TechniqueURL: Optional[str] = None
|
||||
|
||||
|
||||
class OutputsConfig(BaseModel):
|
||||
"""Container for output-related configuration (table, PDF, etc.)."""
|
||||
|
||||
class Config:
|
||||
allow_population_by_field_name = True
|
||||
|
||||
Table_Config: Optional[TableConfig] = Field(None, alias="TableConfig")
|
||||
PDF_Config: Optional[PDFConfig] = Field(None, alias="PDFConfig")
|
||||
|
||||
|
||||
class ComplianceFramework(BaseModel):
|
||||
"""Universal top-level container for any compliance framework.
|
||||
|
||||
Provider may be explicit (single-provider JSON) or derived from Checks
|
||||
keys when Checks is a dict keyed by provider.
|
||||
"""
|
||||
|
||||
Framework: str
|
||||
Name: str
|
||||
Provider: Optional[str] = None
|
||||
Version: Optional[str] = None
|
||||
Description: str
|
||||
Icon: Optional[str] = None
|
||||
Requirements: list[UniversalComplianceRequirement]
|
||||
AttributesMetadata: Optional[list[AttributeMetadata]] = None
|
||||
Outputs: Optional[OutputsConfig] = None
|
||||
|
||||
@root_validator(pre=True)
|
||||
# noqa: F841 - since vulture raises unused variable 'cls'
|
||||
def migrate_legacy_output_fields(cls, values): # noqa: F841
|
||||
"""Move top-level TableConfig/PDFConfig into Outputs for backward compat."""
|
||||
tc = values.pop("TableConfig", None)
|
||||
pc = values.pop("PDFConfig", None)
|
||||
if tc is not None or pc is not None:
|
||||
outputs = values.get("Outputs") or {}
|
||||
if isinstance(outputs, OutputsConfig):
|
||||
outputs = outputs.dict()
|
||||
if tc is not None and "TableConfig" not in outputs:
|
||||
outputs["TableConfig"] = tc
|
||||
if pc is not None and "PDFConfig" not in outputs:
|
||||
outputs["PDFConfig"] = pc
|
||||
values["Outputs"] = outputs
|
||||
return values
|
||||
|
||||
@root_validator
|
||||
# noqa: F841 - since vulture raises unused variable 'cls'
|
||||
def validate_attributes_against_metadata(cls, values): # noqa: F841
|
||||
"""Validate every Requirement's Attributes dict against AttributesMetadata.
|
||||
|
||||
Checks:
|
||||
- Required keys (Required=True) must be present in each Requirement.
|
||||
- Enum-constrained keys must have a value within the declared Enum list.
|
||||
- Basic type validation (int, float, bool) for non-None values.
|
||||
"""
|
||||
metadata = values.get("AttributesMetadata")
|
||||
requirements = values.get("Requirements", [])
|
||||
if not metadata:
|
||||
return values
|
||||
|
||||
required_keys = {m.Key for m in metadata if m.Required}
|
||||
valid_keys = {m.Key for m in metadata}
|
||||
enum_map = {m.Key: m.Enum for m in metadata if m.Enum}
|
||||
type_map = {m.Key: m.Type for m in metadata}
|
||||
|
||||
type_checks = {
|
||||
"int": int,
|
||||
"float": (int, float),
|
||||
"bool": bool,
|
||||
}
|
||||
|
||||
errors = []
|
||||
for req in requirements:
|
||||
attrs = req.Attributes
|
||||
|
||||
# Required keys
|
||||
for key in required_keys:
|
||||
if key not in attrs or attrs[key] is None:
|
||||
errors.append(
|
||||
f"Requirement '{req.Id}': missing required attribute '{key}'"
|
||||
)
|
||||
|
||||
# Enum validation
|
||||
for key, allowed in enum_map.items():
|
||||
if key in attrs and attrs[key] is not None:
|
||||
if attrs[key] not in allowed:
|
||||
errors.append(
|
||||
f"Requirement '{req.Id}': attribute '{key}' value "
|
||||
f"'{attrs[key]}' not in {allowed}"
|
||||
)
|
||||
|
||||
# Type validation for non-string types
|
||||
for key in attrs:
|
||||
if key not in valid_keys or attrs[key] is None:
|
||||
continue
|
||||
expected_type = type_map.get(key, "str")
|
||||
py_type = type_checks.get(expected_type)
|
||||
if py_type and not isinstance(attrs[key], py_type):
|
||||
errors.append(
|
||||
f"Requirement '{req.Id}': attribute '{key}' expected "
|
||||
f"type {expected_type}, got {type(attrs[key]).__name__}"
|
||||
)
|
||||
|
||||
if errors:
|
||||
detail = "\n ".join(errors)
|
||||
raise ValueError(f"AttributesMetadata validation failed:\n {detail}")
|
||||
|
||||
return values
|
||||
|
||||
def get_providers(self) -> list:
|
||||
"""Derive the set of providers this framework supports.
|
||||
|
||||
Inspects Checks keys across all requirements. Falls back to the
|
||||
explicit Provider field for single-provider frameworks.
|
||||
"""
|
||||
providers = set()
|
||||
for req in self.Requirements:
|
||||
if isinstance(req.Checks, dict):
|
||||
providers.update(k.lower() for k in req.Checks.keys())
|
||||
if self.Provider and not providers:
|
||||
providers.add(self.Provider.lower())
|
||||
return sorted(providers)
|
||||
|
||||
def supports_provider(self, provider: str) -> bool:
|
||||
"""Return True if this framework has checks for the given provider."""
|
||||
provider_lower = provider.lower()
|
||||
for req in self.Requirements:
|
||||
if isinstance(req.Checks, dict):
|
||||
if provider_lower in (k.lower() for k in req.Checks.keys()):
|
||||
return True
|
||||
elif isinstance(req.Checks, list) and req.Checks:
|
||||
# List-style checks: rely on explicit Provider field
|
||||
if self.Provider and self.Provider.lower() == provider_lower:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ─── Legacy-to-Universal Adapter (Phase 2) ──────────────────────────────────
|
||||
|
||||
|
||||
def _infer_attribute_metadata(legacy: Compliance) -> Optional[list[AttributeMetadata]]:
|
||||
"""Introspect the first requirement's attribute model to build AttributesMetadata."""
|
||||
try:
|
||||
if not legacy.Requirements:
|
||||
return None
|
||||
|
||||
first_req = legacy.Requirements[0]
|
||||
|
||||
# MITRE requirements have Tactics at top level, not in Attributes
|
||||
if isinstance(first_req, Mitre_Requirement):
|
||||
return None
|
||||
|
||||
if not first_req.Attributes:
|
||||
return None
|
||||
|
||||
sample_attr = first_req.Attributes[0]
|
||||
metadata = []
|
||||
|
||||
for field_name, field_obj in sample_attr.__fields__.items():
|
||||
field_type = field_obj.outer_type_
|
||||
type_str = "str"
|
||||
enum_values = None
|
||||
|
||||
origin = getattr(field_type, "__origin__", None)
|
||||
if field_type is int:
|
||||
type_str = "int"
|
||||
elif field_type is float:
|
||||
type_str = "float"
|
||||
elif field_type is bool:
|
||||
type_str = "bool"
|
||||
elif origin is list:
|
||||
args = getattr(field_type, "__args__", ())
|
||||
if args and args[0] is dict:
|
||||
type_str = "list_dict"
|
||||
else:
|
||||
type_str = "list_str"
|
||||
elif isinstance(field_type, type) and issubclass(field_type, Enum):
|
||||
type_str = "str"
|
||||
enum_values = [e.value for e in field_type]
|
||||
|
||||
metadata.append(
|
||||
AttributeMetadata(
|
||||
Key=field_name,
|
||||
Type=type_str,
|
||||
Enum=enum_values,
|
||||
Required=field_obj.required,
|
||||
)
|
||||
)
|
||||
|
||||
return metadata
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def adapt_legacy_to_universal(legacy: Compliance) -> ComplianceFramework:
|
||||
"""Convert a legacy Compliance object to a ComplianceFramework."""
|
||||
universal_requirements = []
|
||||
|
||||
for req in legacy.Requirements:
|
||||
if isinstance(req, Mitre_Requirement):
|
||||
# For MITRE, promote special fields and store raw attributes
|
||||
raw_attrs = [attr.dict() for attr in req.Attributes]
|
||||
attrs = {"_raw_attributes": raw_attrs}
|
||||
universal_requirements.append(
|
||||
UniversalComplianceRequirement(
|
||||
Id=req.Id,
|
||||
Description=req.Description,
|
||||
Name=req.Name,
|
||||
Attributes=attrs,
|
||||
Checks=req.Checks,
|
||||
Tactics=req.Tactics,
|
||||
SubTechniques=req.SubTechniques,
|
||||
Platforms=req.Platforms,
|
||||
TechniqueURL=req.TechniqueURL,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Standard requirement: flatten first attribute to dict
|
||||
if req.Attributes:
|
||||
attrs = req.Attributes[0].dict()
|
||||
else:
|
||||
attrs = {}
|
||||
universal_requirements.append(
|
||||
UniversalComplianceRequirement(
|
||||
Id=req.Id,
|
||||
Description=req.Description,
|
||||
Name=req.Name,
|
||||
Attributes=attrs,
|
||||
Checks=req.Checks,
|
||||
)
|
||||
)
|
||||
|
||||
inferred_metadata = _infer_attribute_metadata(legacy)
|
||||
|
||||
return ComplianceFramework(
|
||||
Framework=legacy.Framework,
|
||||
Name=legacy.Name,
|
||||
Provider=legacy.Provider,
|
||||
Version=legacy.Version,
|
||||
Description=legacy.Description,
|
||||
Requirements=universal_requirements,
|
||||
AttributesMetadata=inferred_metadata,
|
||||
)
|
||||
|
||||
|
||||
def load_compliance_framework_universal(path: str) -> ComplianceFramework:
|
||||
"""Load a compliance JSON as a ComplianceFramework, handling both new and legacy formats."""
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
if "AttributesMetadata" in data:
|
||||
# New universal format — parse directly
|
||||
return ComplianceFramework(**data)
|
||||
else:
|
||||
# Legacy format — parse as Compliance, then adapt
|
||||
legacy = Compliance(**data)
|
||||
return adapt_legacy_to_universal(legacy)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to load universal compliance framework from {path}: "
|
||||
f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _load_jsons_from_dir(dir_path: str, provider: str, bulk: dict) -> None:
|
||||
"""Scan *dir_path* for JSON files and add matching frameworks to *bulk*."""
|
||||
for filename in os.listdir(dir_path):
|
||||
file_path = os.path.join(dir_path, filename)
|
||||
if not (
|
||||
os.path.isfile(file_path)
|
||||
and filename.endswith(".json")
|
||||
and os.stat(file_path).st_size > 0
|
||||
):
|
||||
continue
|
||||
framework_name = filename.split(".json")[0]
|
||||
if framework_name in bulk:
|
||||
continue
|
||||
fw = load_compliance_framework_universal(file_path)
|
||||
if fw is None:
|
||||
continue
|
||||
if fw.Provider and fw.Provider.lower() == provider.lower():
|
||||
bulk[framework_name] = fw
|
||||
elif fw.supports_provider(provider):
|
||||
bulk[framework_name] = fw
|
||||
|
||||
|
||||
def get_bulk_compliance_frameworks_universal(provider: str) -> dict:
|
||||
"""Bulk load all compliance frameworks relevant to the given provider.
|
||||
|
||||
Scans:
|
||||
|
||||
1. The **top-level** ``prowler/compliance/`` directory for multi-provider
|
||||
JSONs (``Checks`` keyed by provider, no ``Provider`` field).
|
||||
2. Every **provider sub-directory** (``prowler/compliance/{p}/``) so that
|
||||
single-provider JSONs are also picked up.
|
||||
|
||||
A framework is included when its explicit ``Provider`` matches
|
||||
(case-insensitive) **or** any requirement has dict-style ``Checks``
|
||||
with a key for *provider*.
|
||||
"""
|
||||
bulk = {}
|
||||
try:
|
||||
available_modules = list_compliance_modules()
|
||||
|
||||
# Resolve the compliance root once (parent of provider sub-dirs).
|
||||
compliance_root = None
|
||||
seen_paths = set()
|
||||
|
||||
for module in available_modules:
|
||||
dir_path = f"{module.module_finder.path}/{module.name.split('.')[-1]}"
|
||||
if not os.path.isdir(dir_path) or dir_path in seen_paths:
|
||||
continue
|
||||
seen_paths.add(dir_path)
|
||||
|
||||
# Remember the root the first time we see a valid sub-dir.
|
||||
if compliance_root is None:
|
||||
compliance_root = module.module_finder.path
|
||||
|
||||
_load_jsons_from_dir(dir_path, provider, bulk)
|
||||
|
||||
# Also scan top-level compliance/ for provider-agnostic JSONs.
|
||||
if compliance_root and os.path.isdir(compliance_root):
|
||||
_load_jsons_from_dir(compliance_root, provider, bulk)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
|
||||
return bulk
|
||||
|
||||
401
prowler/lib/outputs/compliance/universal/ocsf_compliance.py
Normal file
401
prowler/lib/outputs/compliance/universal/ocsf_compliance.py
Normal file
@@ -0,0 +1,401 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
|
||||
from py_ocsf_models.events.base_event import SeverityID
|
||||
from py_ocsf_models.events.base_event import StatusID as EventStatusID
|
||||
from py_ocsf_models.events.findings.compliance_finding import ComplianceFinding
|
||||
from py_ocsf_models.events.findings.compliance_finding_type_id import (
|
||||
ComplianceFindingTypeID,
|
||||
)
|
||||
from py_ocsf_models.events.findings.finding import ActivityID, FindingInformation
|
||||
from py_ocsf_models.objects.check import Check
|
||||
from py_ocsf_models.objects.compliance import Compliance
|
||||
from py_ocsf_models.objects.compliance_status import StatusID as ComplianceStatusID
|
||||
from py_ocsf_models.objects.group import Group
|
||||
from py_ocsf_models.objects.metadata import Metadata
|
||||
from py_ocsf_models.objects.product import Product
|
||||
from py_ocsf_models.objects.resource_details import ResourceDetails
|
||||
|
||||
from prowler.config.config import prowler_version
|
||||
from prowler.lib.check.compliance_models import ComplianceFramework
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.finding import Finding
|
||||
from prowler.lib.outputs.ocsf.ocsf import OCSF
|
||||
from prowler.lib.outputs.utils import unroll_dict_to_list
|
||||
from prowler.lib.utils.utils import open_file
|
||||
|
||||
PROWLER_TO_COMPLIANCE_STATUS = {
|
||||
"PASS": ComplianceStatusID.Pass,
|
||||
"FAIL": ComplianceStatusID.Fail,
|
||||
"MANUAL": ComplianceStatusID.Unknown,
|
||||
}
|
||||
|
||||
|
||||
def _to_snake_case(name: str) -> str:
|
||||
"""Convert a PascalCase or camelCase string to snake_case."""
|
||||
import re
|
||||
|
||||
# Insert underscore before uppercase letters preceded by lowercase
|
||||
s = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
|
||||
# Insert underscore between consecutive uppercase and following lowercase
|
||||
s = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", s)
|
||||
return s.lower()
|
||||
|
||||
|
||||
def _build_requirement_attrs(requirement, framework) -> dict:
|
||||
"""Build a dict with requirement attributes for the unmapped section.
|
||||
|
||||
Keys are normalized to snake_case for OCSF consistency.
|
||||
Only includes attributes whose AttributeMetadata has OCSF=True.
|
||||
When no metadata is declared, all attributes are included.
|
||||
"""
|
||||
attrs = requirement.Attributes
|
||||
if not attrs:
|
||||
return {}
|
||||
|
||||
# Build set of keys allowed for OCSF output
|
||||
metadata = framework.AttributesMetadata
|
||||
if metadata:
|
||||
ocsf_keys = {m.Key for m in metadata if m.OCSF}
|
||||
else:
|
||||
ocsf_keys = None # No metadata → include all
|
||||
|
||||
result = {}
|
||||
for key, value in attrs.items():
|
||||
if ocsf_keys is not None and key not in ocsf_keys:
|
||||
continue
|
||||
result[_to_snake_case(key)] = value
|
||||
return result
|
||||
|
||||
|
||||
class OCSFComplianceOutput:
|
||||
"""Produces OCSF ComplianceFinding (class_uid 2003) events from
|
||||
universal compliance framework data.
|
||||
|
||||
Each finding × requirement combination produces one ComplianceFinding event
|
||||
with structured Compliance and Check objects.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
findings: list,
|
||||
framework: ComplianceFramework,
|
||||
file_path: str = None,
|
||||
from_cli: bool = True,
|
||||
provider: str = None,
|
||||
) -> None:
|
||||
self._data = []
|
||||
self._file_descriptor = None
|
||||
self.file_path = file_path
|
||||
self._from_cli = from_cli
|
||||
self._provider = provider
|
||||
self.close_file = False
|
||||
|
||||
if findings:
|
||||
compliance_name = (
|
||||
framework.Framework + "-" + framework.Version
|
||||
if framework.Version
|
||||
else framework.Framework
|
||||
)
|
||||
self._transform(findings, framework, compliance_name)
|
||||
if not self._file_descriptor and file_path:
|
||||
self._create_file_descriptor(file_path)
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return self._data
|
||||
|
||||
def _transform(
|
||||
self,
|
||||
findings: List[Finding],
|
||||
framework: ComplianceFramework,
|
||||
compliance_name: str,
|
||||
) -> None:
|
||||
# Build check -> requirements map (same logic as UniversalComplianceOutput)
|
||||
check_req_map = {}
|
||||
for req in framework.Requirements:
|
||||
checks = req.Checks
|
||||
if isinstance(checks, dict):
|
||||
if self._provider:
|
||||
all_checks = checks.get(self._provider.lower(), [])
|
||||
else:
|
||||
all_checks = []
|
||||
for check_list in checks.values():
|
||||
all_checks.extend(check_list)
|
||||
else:
|
||||
all_checks = checks
|
||||
for check_id in all_checks:
|
||||
check_req_map.setdefault(check_id, []).append(req)
|
||||
|
||||
for finding in findings:
|
||||
if finding.check_id in check_req_map:
|
||||
for req in check_req_map[finding.check_id]:
|
||||
cf = self._build_compliance_finding(
|
||||
finding, framework, req, compliance_name
|
||||
)
|
||||
if cf:
|
||||
self._data.append(cf)
|
||||
|
||||
# Manual requirements (no checks or empty for current provider)
|
||||
for req in framework.Requirements:
|
||||
checks = req.Checks
|
||||
if isinstance(checks, dict):
|
||||
if self._provider:
|
||||
has_checks = bool(checks.get(self._provider.lower(), []))
|
||||
else:
|
||||
has_checks = any(checks.values())
|
||||
else:
|
||||
has_checks = bool(checks)
|
||||
|
||||
if not has_checks:
|
||||
cf = self._build_manual_compliance_finding(
|
||||
framework, req, compliance_name
|
||||
)
|
||||
if cf:
|
||||
self._data.append(cf)
|
||||
|
||||
def _build_unmapped(self, finding, requirement, framework) -> dict:
|
||||
"""Build the unmapped dict with cloud info and requirement attributes."""
|
||||
unmapped = {}
|
||||
|
||||
# Cloud info (from finding, when available)
|
||||
if finding and getattr(finding, "provider", None) != "kubernetes":
|
||||
unmapped["cloud"] = {
|
||||
"provider": finding.provider,
|
||||
"region": finding.region,
|
||||
"account": {
|
||||
"uid": finding.account_uid,
|
||||
"name": finding.account_name,
|
||||
},
|
||||
"org": {
|
||||
"uid": finding.account_organization_uid,
|
||||
"name": finding.account_organization_name,
|
||||
},
|
||||
}
|
||||
|
||||
# Requirement attributes
|
||||
req_attrs = _build_requirement_attrs(requirement, framework)
|
||||
if req_attrs:
|
||||
unmapped["requirement_attributes"] = req_attrs
|
||||
|
||||
return unmapped or None
|
||||
|
||||
def _build_compliance_finding(
|
||||
self,
|
||||
finding: Finding,
|
||||
framework: ComplianceFramework,
|
||||
requirement,
|
||||
compliance_name: str,
|
||||
) -> ComplianceFinding:
|
||||
try:
|
||||
compliance_status = PROWLER_TO_COMPLIANCE_STATUS.get(
|
||||
finding.status, ComplianceStatusID.Unknown
|
||||
)
|
||||
check_status = PROWLER_TO_COMPLIANCE_STATUS.get(
|
||||
finding.status, ComplianceStatusID.Unknown
|
||||
)
|
||||
|
||||
finding_severity = getattr(
|
||||
SeverityID,
|
||||
finding.metadata.Severity.capitalize(),
|
||||
SeverityID.Unknown,
|
||||
)
|
||||
event_status = OCSF.get_finding_status_id(finding.muted)
|
||||
|
||||
time_value = (
|
||||
int(finding.timestamp.timestamp())
|
||||
if isinstance(finding.timestamp, datetime)
|
||||
else finding.timestamp
|
||||
)
|
||||
|
||||
cf = ComplianceFinding(
|
||||
activity_id=ActivityID.Create.value,
|
||||
activity_name=ActivityID.Create.name,
|
||||
compliance=Compliance(
|
||||
standards=[compliance_name],
|
||||
requirements=[requirement.Id],
|
||||
control=requirement.Description,
|
||||
status_id=compliance_status,
|
||||
checks=[
|
||||
Check(
|
||||
uid=finding.check_id,
|
||||
name=finding.metadata.CheckTitle,
|
||||
desc=finding.metadata.Description,
|
||||
status=finding.status,
|
||||
status_id=check_status,
|
||||
)
|
||||
],
|
||||
),
|
||||
finding_info=FindingInformation(
|
||||
uid=f"{finding.uid}-{requirement.Id}",
|
||||
title=requirement.Id,
|
||||
desc=requirement.Description,
|
||||
created_time=time_value,
|
||||
created_time_dt=(
|
||||
finding.timestamp
|
||||
if isinstance(finding.timestamp, datetime)
|
||||
else None
|
||||
),
|
||||
),
|
||||
message=finding.status_extended,
|
||||
metadata=Metadata(
|
||||
event_code=finding.check_id,
|
||||
product=Product(
|
||||
uid="prowler",
|
||||
name="Prowler",
|
||||
vendor_name="Prowler",
|
||||
version=finding.prowler_version,
|
||||
),
|
||||
profiles=(
|
||||
["cloud", "datetime"]
|
||||
if finding.provider != "kubernetes"
|
||||
else ["container", "datetime"]
|
||||
),
|
||||
tenant_uid=finding.account_organization_uid,
|
||||
),
|
||||
resources=[
|
||||
ResourceDetails(
|
||||
labels=unroll_dict_to_list(finding.resource_tags),
|
||||
name=finding.resource_name,
|
||||
uid=finding.resource_uid,
|
||||
group=Group(name=finding.metadata.ServiceName),
|
||||
type=finding.metadata.ResourceType,
|
||||
cloud_partition=(
|
||||
finding.partition
|
||||
if finding.provider != "kubernetes"
|
||||
else None
|
||||
),
|
||||
region=(
|
||||
finding.region if finding.provider != "kubernetes" else None
|
||||
),
|
||||
namespace=(
|
||||
finding.region.replace("namespace: ", "")
|
||||
if finding.provider == "kubernetes"
|
||||
else None
|
||||
),
|
||||
data={
|
||||
"details": finding.resource_details,
|
||||
"metadata": finding.resource_metadata,
|
||||
},
|
||||
)
|
||||
],
|
||||
severity_id=finding_severity.value,
|
||||
severity=finding_severity.name,
|
||||
status_id=event_status.value,
|
||||
status=event_status.name,
|
||||
status_code=finding.status,
|
||||
status_detail=finding.status_extended,
|
||||
time=time_value,
|
||||
time_dt=(
|
||||
finding.timestamp
|
||||
if isinstance(finding.timestamp, datetime)
|
||||
else None
|
||||
),
|
||||
type_uid=ComplianceFindingTypeID.Create,
|
||||
type_name=f"Compliance Finding: {ComplianceFindingTypeID.Create.name}",
|
||||
unmapped=self._build_unmapped(finding, requirement, framework),
|
||||
)
|
||||
|
||||
return cf
|
||||
except Exception as e:
|
||||
logger.debug(f"Skipping OCSF compliance finding for {requirement.Id}: {e}")
|
||||
return None
|
||||
|
||||
def _build_manual_compliance_finding(
|
||||
self,
|
||||
framework: ComplianceFramework,
|
||||
requirement,
|
||||
compliance_name: str,
|
||||
) -> ComplianceFinding:
|
||||
try:
|
||||
from prowler.config.config import timestamp as config_timestamp
|
||||
|
||||
time_value = int(config_timestamp.timestamp())
|
||||
|
||||
return ComplianceFinding(
|
||||
activity_id=ActivityID.Create.value,
|
||||
activity_name=ActivityID.Create.name,
|
||||
compliance=Compliance(
|
||||
standards=[compliance_name],
|
||||
requirements=[requirement.Id],
|
||||
control=requirement.Description,
|
||||
status_id=ComplianceStatusID.Unknown,
|
||||
),
|
||||
finding_info=FindingInformation(
|
||||
uid=f"manual-{requirement.Id}",
|
||||
title=requirement.Id,
|
||||
desc=requirement.Description,
|
||||
created_time=time_value,
|
||||
),
|
||||
message="Manual check",
|
||||
metadata=Metadata(
|
||||
event_code="manual",
|
||||
product=Product(
|
||||
uid="prowler",
|
||||
name="Prowler",
|
||||
vendor_name="Prowler",
|
||||
version=prowler_version,
|
||||
),
|
||||
),
|
||||
severity_id=SeverityID.Informational.value,
|
||||
severity=SeverityID.Informational.name,
|
||||
status_id=EventStatusID.New.value,
|
||||
status=EventStatusID.New.name,
|
||||
status_code="MANUAL",
|
||||
status_detail="Manual check",
|
||||
time=time_value,
|
||||
type_uid=ComplianceFindingTypeID.Create,
|
||||
type_name=f"Compliance Finding: {ComplianceFindingTypeID.Create.name}",
|
||||
unmapped=self._build_unmapped(None, requirement, framework),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"Skipping manual OCSF compliance finding for {requirement.Id}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
def _create_file_descriptor(self, file_path: str) -> None:
|
||||
try:
|
||||
self._file_descriptor = open_file(file_path, "a")
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def batch_write_data_to_file(self) -> None:
|
||||
"""Write ComplianceFinding events to a JSON array file."""
|
||||
try:
|
||||
if (
|
||||
getattr(self, "_file_descriptor", None)
|
||||
and not self._file_descriptor.closed
|
||||
and self._data
|
||||
):
|
||||
if self._file_descriptor.tell() == 0:
|
||||
self._file_descriptor.write("[")
|
||||
for finding in self._data:
|
||||
try:
|
||||
if hasattr(finding, "model_dump_json"):
|
||||
json_output = finding.model_dump_json(
|
||||
exclude_none=True, indent=4
|
||||
)
|
||||
else:
|
||||
json_output = finding.json(exclude_none=True, indent=4)
|
||||
self._file_descriptor.write(json_output)
|
||||
self._file_descriptor.write(",")
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
if self.close_file or self._from_cli:
|
||||
if self._file_descriptor.tell() != 1:
|
||||
self._file_descriptor.seek(
|
||||
self._file_descriptor.tell() - 1, os.SEEK_SET
|
||||
)
|
||||
self._file_descriptor.truncate()
|
||||
self._file_descriptor.write("]")
|
||||
self._file_descriptor.close()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
298
prowler/lib/outputs/compliance/universal/universal_output.py
Normal file
298
prowler/lib/outputs/compliance/universal/universal_output.py
Normal file
@@ -0,0 +1,298 @@
|
||||
from csv import DictWriter
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from pydantic.v1 import create_model
|
||||
|
||||
from prowler.config.config import timestamp
|
||||
from prowler.lib.check.compliance_models import ComplianceFramework
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.finding import Finding
|
||||
from prowler.lib.utils.utils import open_file
|
||||
|
||||
PROVIDER_HEADER_MAP = {
|
||||
"aws": ("AccountId", "account_uid", "Region", "region"),
|
||||
"azure": ("SubscriptionId", "account_uid", "Location", "region"),
|
||||
"gcp": ("ProjectId", "account_uid", "Location", "region"),
|
||||
"kubernetes": ("Context", "account_name", "Namespace", "region"),
|
||||
"m365": ("TenantId", "account_uid", "Location", "region"),
|
||||
"github": ("Account_Name", "account_name", "Account_Id", "account_uid"),
|
||||
"oraclecloud": ("TenancyId", "account_uid", "Region", "region"),
|
||||
"alibabacloud": ("AccountId", "account_uid", "Region", "region"),
|
||||
"nhn": ("AccountId", "account_uid", "Region", "region"),
|
||||
}
|
||||
_DEFAULT_HEADERS = ("AccountId", "account_uid", "Region", "region")
|
||||
|
||||
|
||||
class UniversalComplianceOutput:
|
||||
"""Universal compliance CSV output driven by ComplianceFramework metadata.
|
||||
|
||||
Dynamically builds a Pydantic row model from AttributesMetadata so that
|
||||
CSV columns match the framework's declared attribute fields.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
findings: list,
|
||||
framework: ComplianceFramework,
|
||||
file_path: str = None,
|
||||
from_cli: bool = True,
|
||||
provider: str = None,
|
||||
) -> None:
|
||||
self._data = []
|
||||
self._file_descriptor = None
|
||||
self.file_path = file_path
|
||||
self._from_cli = from_cli
|
||||
self._provider = provider
|
||||
self.close_file = False
|
||||
|
||||
if file_path:
|
||||
path_obj = Path(file_path)
|
||||
self._file_extension = path_obj.suffix if path_obj.suffix else ""
|
||||
|
||||
if findings:
|
||||
self._row_model = self._build_row_model(framework)
|
||||
compliance_name = (
|
||||
framework.Framework + "-" + framework.Version
|
||||
if framework.Version
|
||||
else framework.Framework
|
||||
)
|
||||
self._transform(findings, framework, compliance_name)
|
||||
if not self._file_descriptor and file_path:
|
||||
self._create_file_descriptor(file_path)
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return self._data
|
||||
|
||||
def _build_row_model(self, framework: ComplianceFramework):
|
||||
"""Build a dynamic Pydantic model from AttributesMetadata."""
|
||||
acct_header, acct_field, loc_header, loc_field = PROVIDER_HEADER_MAP.get(
|
||||
(self._provider or "").lower(), _DEFAULT_HEADERS
|
||||
)
|
||||
self._acct_header = acct_header
|
||||
self._acct_field = acct_field
|
||||
self._loc_header = loc_header
|
||||
self._loc_field = loc_field
|
||||
|
||||
# Base fields present in every compliance CSV
|
||||
fields = {
|
||||
"Provider": (str, ...),
|
||||
"Description": (str, ...),
|
||||
acct_header: (str, ...),
|
||||
loc_header: (str, ...),
|
||||
"AssessmentDate": (str, ...),
|
||||
"Requirements_Id": (str, ...),
|
||||
"Requirements_Description": (str, ...),
|
||||
}
|
||||
|
||||
# Dynamic attribute columns from metadata
|
||||
if framework.AttributesMetadata:
|
||||
for attr_meta in framework.AttributesMetadata:
|
||||
if not attr_meta.CSV:
|
||||
continue
|
||||
field_name = f"Requirements_Attributes_{attr_meta.Key}"
|
||||
# Map type strings to Python types
|
||||
type_map = {
|
||||
"str": Optional[str],
|
||||
"int": Optional[int],
|
||||
"float": Optional[float],
|
||||
"bool": Optional[bool],
|
||||
"list_str": Optional[str], # Serialized as joined string
|
||||
"list_dict": Optional[str], # Serialized as string
|
||||
}
|
||||
py_type = type_map.get(attr_meta.Type, Optional[str])
|
||||
fields[field_name] = (py_type, None)
|
||||
|
||||
# Check if any requirement has MITRE fields
|
||||
has_mitre = any(req.Tactics for req in framework.Requirements if req.Tactics)
|
||||
if has_mitre:
|
||||
fields["Requirements_Tactics"] = (Optional[str], None)
|
||||
fields["Requirements_SubTechniques"] = (Optional[str], None)
|
||||
fields["Requirements_Platforms"] = (Optional[str], None)
|
||||
fields["Requirements_TechniqueURL"] = (Optional[str], None)
|
||||
|
||||
# Trailing fields
|
||||
fields["Status"] = (str, ...)
|
||||
fields["StatusExtended"] = (str, ...)
|
||||
fields["ResourceId"] = (str, ...)
|
||||
fields["ResourceName"] = (str, ...)
|
||||
fields["CheckId"] = (str, ...)
|
||||
fields["Muted"] = (bool, ...)
|
||||
fields["Framework"] = (str, ...)
|
||||
fields["Name"] = (str, ...)
|
||||
|
||||
return create_model("UniversalComplianceRow", **fields)
|
||||
|
||||
def _serialize_attr_value(self, value):
|
||||
"""Serialize attribute values for CSV."""
|
||||
if isinstance(value, list):
|
||||
if value and isinstance(value[0], dict):
|
||||
return str(value)
|
||||
return " | ".join(str(v) for v in value)
|
||||
return value
|
||||
|
||||
def _build_row(self, finding, framework, requirement, is_manual=False):
|
||||
"""Build a single row dict for a finding + requirement combination."""
|
||||
row = {
|
||||
"Provider": (
|
||||
finding.provider
|
||||
if not is_manual
|
||||
else (framework.Provider or self._provider or "").lower()
|
||||
),
|
||||
"Description": framework.Description,
|
||||
self._acct_header: (
|
||||
getattr(finding, self._acct_field, "") if not is_manual else ""
|
||||
),
|
||||
self._loc_header: (
|
||||
getattr(finding, self._loc_field, "") if not is_manual else ""
|
||||
),
|
||||
"AssessmentDate": str(timestamp),
|
||||
"Requirements_Id": requirement.Id,
|
||||
"Requirements_Description": requirement.Description,
|
||||
}
|
||||
|
||||
# Add dynamic attribute columns
|
||||
if framework.AttributesMetadata:
|
||||
for attr_meta in framework.AttributesMetadata:
|
||||
if not attr_meta.CSV:
|
||||
continue
|
||||
field_name = f"Requirements_Attributes_{attr_meta.Key}"
|
||||
raw_val = requirement.Attributes.get(attr_meta.Key)
|
||||
row[field_name] = (
|
||||
self._serialize_attr_value(raw_val) if raw_val is not None else None
|
||||
)
|
||||
|
||||
# MITRE fields
|
||||
if requirement.Tactics:
|
||||
row["Requirements_Tactics"] = (
|
||||
" | ".join(requirement.Tactics) if requirement.Tactics else None
|
||||
)
|
||||
row["Requirements_SubTechniques"] = (
|
||||
" | ".join(requirement.SubTechniques)
|
||||
if requirement.SubTechniques
|
||||
else None
|
||||
)
|
||||
row["Requirements_Platforms"] = (
|
||||
" | ".join(requirement.Platforms) if requirement.Platforms else None
|
||||
)
|
||||
row["Requirements_TechniqueURL"] = requirement.TechniqueURL
|
||||
|
||||
row["Status"] = finding.status if not is_manual else "MANUAL"
|
||||
row["StatusExtended"] = (
|
||||
finding.status_extended if not is_manual else "Manual check"
|
||||
)
|
||||
row["ResourceId"] = finding.resource_uid if not is_manual else "manual_check"
|
||||
row["ResourceName"] = finding.resource_name if not is_manual else "Manual check"
|
||||
row["CheckId"] = finding.check_id if not is_manual else "manual"
|
||||
row["Muted"] = finding.muted if not is_manual else False
|
||||
row["Framework"] = framework.Framework
|
||||
row["Name"] = framework.Name
|
||||
|
||||
return row
|
||||
|
||||
def _transform(
|
||||
self,
|
||||
findings: list[Finding],
|
||||
framework: ComplianceFramework,
|
||||
compliance_name: str,
|
||||
) -> None:
|
||||
"""Transform findings into universal compliance CSV rows."""
|
||||
# Build check -> requirements map (filtered by provider for dict Checks)
|
||||
check_req_map = {}
|
||||
for req in framework.Requirements:
|
||||
checks = req.Checks
|
||||
if isinstance(checks, dict):
|
||||
if self._provider:
|
||||
all_checks = checks.get(self._provider.lower(), [])
|
||||
else:
|
||||
all_checks = []
|
||||
for check_list in checks.values():
|
||||
all_checks.extend(check_list)
|
||||
else:
|
||||
all_checks = checks
|
||||
for check_id in all_checks:
|
||||
if check_id not in check_req_map:
|
||||
check_req_map[check_id] = []
|
||||
check_req_map[check_id].append(req)
|
||||
|
||||
# Process findings using the provider-filtered check_req_map.
|
||||
# This ensures that for multi-provider dict Checks, only the checks
|
||||
# belonging to the current provider produce output rows.
|
||||
for finding in findings:
|
||||
check_id = finding.check_id
|
||||
if check_id in check_req_map:
|
||||
for req in check_req_map[check_id]:
|
||||
row = self._build_row(finding, framework, req)
|
||||
try:
|
||||
self._data.append(self._row_model(**row))
|
||||
except Exception as e:
|
||||
logger.debug(f"Skipping row for {req.Id}: {e}")
|
||||
|
||||
# Manual requirements (no checks or empty dict)
|
||||
for req in framework.Requirements:
|
||||
checks = req.Checks
|
||||
if isinstance(checks, dict):
|
||||
if self._provider:
|
||||
has_checks = bool(checks.get(self._provider.lower(), []))
|
||||
else:
|
||||
has_checks = any(checks.values())
|
||||
else:
|
||||
has_checks = bool(checks)
|
||||
|
||||
if not has_checks:
|
||||
# Use a dummy finding-like namespace for manual rows
|
||||
row = self._build_row(
|
||||
_ManualFindingStub(), framework, req, is_manual=True
|
||||
)
|
||||
try:
|
||||
self._data.append(self._row_model(**row))
|
||||
except Exception as e:
|
||||
logger.debug(f"Skipping manual row for {req.Id}: {e}")
|
||||
|
||||
def _create_file_descriptor(self, file_path: str) -> None:
|
||||
try:
|
||||
self._file_descriptor = open_file(file_path, "a")
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def batch_write_data_to_file(self) -> None:
|
||||
"""Write findings data to CSV."""
|
||||
try:
|
||||
if (
|
||||
getattr(self, "_file_descriptor", None)
|
||||
and not self._file_descriptor.closed
|
||||
and self._data
|
||||
):
|
||||
csv_writer = DictWriter(
|
||||
self._file_descriptor,
|
||||
fieldnames=[field.upper() for field in self._data[0].dict().keys()],
|
||||
delimiter=";",
|
||||
)
|
||||
if self._file_descriptor.tell() == 0:
|
||||
csv_writer.writeheader()
|
||||
for row in self._data:
|
||||
csv_writer.writerow({k.upper(): v for k, v in row.dict().items()})
|
||||
if self.close_file or self._from_cli:
|
||||
self._file_descriptor.close()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class _ManualFindingStub:
|
||||
"""Minimal stub to satisfy _build_row for manual requirements."""
|
||||
|
||||
provider = ""
|
||||
account_uid = ""
|
||||
account_name = ""
|
||||
region = ""
|
||||
status = "MANUAL"
|
||||
status_extended = "Manual check"
|
||||
resource_uid = "manual_check"
|
||||
resource_name = "Manual check"
|
||||
check_id = "manual"
|
||||
muted = False
|
||||
494
prowler/lib/outputs/compliance/universal/universal_table.py
Normal file
494
prowler/lib/outputs/compliance/universal/universal_table.py
Normal file
@@ -0,0 +1,494 @@
|
||||
from colorama import Fore, Style
|
||||
from tabulate import tabulate
|
||||
|
||||
from prowler.config.config import orange_color
|
||||
from prowler.lib.check.compliance_models import ComplianceFramework
|
||||
|
||||
|
||||
def get_universal_table(
|
||||
findings: list,
|
||||
bulk_checks_metadata: dict,
|
||||
compliance_framework_name: str,
|
||||
output_filename: str,
|
||||
output_directory: str,
|
||||
compliance_overview: bool,
|
||||
framework: ComplianceFramework = None,
|
||||
provider: str = None,
|
||||
output_formats: list = None,
|
||||
) -> None:
|
||||
"""Render a compliance console table driven by TableConfig.
|
||||
|
||||
Supports 3 modes:
|
||||
- Grouped: GroupBy only (generic, C5, CSA, ISO, KISA)
|
||||
- Split: GroupBy + SplitBy (CIS Level 1/2, ENS alto/medio/bajo/opcional)
|
||||
- Scored: GroupBy + Scoring (ThreatScore weighted risk %)
|
||||
|
||||
When ``provider`` is given and ``Checks`` is a multi-provider dict,
|
||||
only the checks for that provider are matched against findings.
|
||||
"""
|
||||
if framework is None or not framework.Outputs or not framework.Outputs.Table_Config:
|
||||
return
|
||||
|
||||
tc = framework.Outputs.Table_Config
|
||||
labels = tc.Labels or _default_labels()
|
||||
|
||||
group_by = tc.GroupBy
|
||||
split_by = tc.SplitBy
|
||||
scoring = tc.Scoring
|
||||
|
||||
if scoring:
|
||||
_render_scored(
|
||||
findings,
|
||||
bulk_checks_metadata,
|
||||
compliance_framework_name,
|
||||
output_filename,
|
||||
output_directory,
|
||||
compliance_overview,
|
||||
framework,
|
||||
group_by,
|
||||
scoring,
|
||||
labels,
|
||||
provider,
|
||||
output_formats=output_formats,
|
||||
)
|
||||
elif split_by:
|
||||
_render_split(
|
||||
findings,
|
||||
bulk_checks_metadata,
|
||||
compliance_framework_name,
|
||||
output_filename,
|
||||
output_directory,
|
||||
compliance_overview,
|
||||
framework,
|
||||
group_by,
|
||||
split_by,
|
||||
labels,
|
||||
provider,
|
||||
output_formats=output_formats,
|
||||
)
|
||||
else:
|
||||
_render_grouped(
|
||||
findings,
|
||||
bulk_checks_metadata,
|
||||
compliance_framework_name,
|
||||
output_filename,
|
||||
output_directory,
|
||||
compliance_overview,
|
||||
framework,
|
||||
group_by,
|
||||
labels,
|
||||
provider,
|
||||
output_formats=output_formats,
|
||||
)
|
||||
|
||||
|
||||
def _default_labels():
|
||||
"""Return a simple namespace with default labels."""
|
||||
from prowler.lib.check.compliance_models import TableLabels
|
||||
|
||||
return TableLabels()
|
||||
|
||||
|
||||
def _build_requirement_check_map(framework, provider=None):
|
||||
"""Build a map of check_id -> list of requirements for fast lookup.
|
||||
|
||||
When *provider* is given and a requirement's ``Checks`` is a dict keyed
|
||||
by provider, only the checks for that provider are included.
|
||||
"""
|
||||
check_map = {}
|
||||
for req in framework.Requirements:
|
||||
checks = req.Checks
|
||||
if isinstance(checks, dict):
|
||||
if provider:
|
||||
all_checks = checks.get(provider.lower(), [])
|
||||
else:
|
||||
all_checks = []
|
||||
for check_list in checks.values():
|
||||
all_checks.extend(check_list)
|
||||
else:
|
||||
all_checks = checks
|
||||
for check_id in all_checks:
|
||||
if check_id not in check_map:
|
||||
check_map[check_id] = []
|
||||
check_map[check_id].append(req)
|
||||
return check_map
|
||||
|
||||
|
||||
def _get_group_key(req, group_by):
|
||||
"""Extract the group key from a requirement."""
|
||||
if group_by == "_Tactics":
|
||||
return req.Tactics or []
|
||||
return [req.Attributes.get(group_by, "Unknown")]
|
||||
|
||||
|
||||
def _print_overview(pass_count, fail_count, muted_count, framework_name, labels):
|
||||
"""Print the overview pass/fail/muted summary."""
|
||||
total = len(fail_count) + len(pass_count) + len(muted_count)
|
||||
if total < 2:
|
||||
return False
|
||||
|
||||
title = (
|
||||
labels.Title
|
||||
or f"Compliance Status of {Fore.YELLOW}{framework_name.upper()}{Style.RESET_ALL} Framework:"
|
||||
)
|
||||
print(f"\n{title}")
|
||||
|
||||
fail_pct = round(len(fail_count) / total * 100, 2)
|
||||
pass_pct = round(len(pass_count) / total * 100, 2)
|
||||
muted_pct = round(len(muted_count) / total * 100, 2)
|
||||
|
||||
fail_label = labels.FailLabel
|
||||
pass_label = labels.PassLabel
|
||||
|
||||
overview_table = [
|
||||
[
|
||||
f"{Fore.RED}{fail_pct}% ({len(fail_count)}) {fail_label}{Style.RESET_ALL}",
|
||||
f"{Fore.GREEN}{pass_pct}% ({len(pass_count)}) {pass_label}{Style.RESET_ALL}",
|
||||
f"{orange_color}{muted_pct}% ({len(muted_count)}) MUTED{Style.RESET_ALL}",
|
||||
]
|
||||
]
|
||||
print(tabulate(overview_table, tablefmt="rounded_grid"))
|
||||
return True
|
||||
|
||||
|
||||
def _render_grouped(
|
||||
findings,
|
||||
bulk_checks_metadata,
|
||||
compliance_framework_name,
|
||||
output_filename,
|
||||
output_directory,
|
||||
compliance_overview,
|
||||
framework,
|
||||
group_by,
|
||||
labels,
|
||||
provider=None,
|
||||
output_formats=None,
|
||||
):
|
||||
"""Grouped mode: one row per group with pass/fail counts."""
|
||||
check_map = _build_requirement_check_map(framework, provider)
|
||||
groups = {}
|
||||
pass_count = []
|
||||
fail_count = []
|
||||
muted_count = []
|
||||
|
||||
for index, finding in enumerate(findings):
|
||||
check_id = finding.check_metadata.CheckID
|
||||
if check_id not in check_map:
|
||||
continue
|
||||
|
||||
for req in check_map[check_id]:
|
||||
for group_key in _get_group_key(req, group_by):
|
||||
if group_key not in groups:
|
||||
groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0}
|
||||
|
||||
if finding.muted:
|
||||
if index not in muted_count:
|
||||
muted_count.append(index)
|
||||
groups[group_key]["Muted"] += 1
|
||||
else:
|
||||
if finding.status == "FAIL" and index not in fail_count:
|
||||
fail_count.append(index)
|
||||
groups[group_key]["FAIL"] += 1
|
||||
elif finding.status == "PASS" and index not in pass_count:
|
||||
pass_count.append(index)
|
||||
groups[group_key]["PASS"] += 1
|
||||
|
||||
if not _print_overview(
|
||||
pass_count, fail_count, muted_count, compliance_framework_name, labels
|
||||
):
|
||||
return
|
||||
|
||||
if not compliance_overview:
|
||||
provider_header = labels.ProviderHeader
|
||||
group_header = labels.GroupHeader or group_by
|
||||
table = {
|
||||
provider_header: [],
|
||||
group_header: [],
|
||||
labels.StatusHeader: [],
|
||||
"Muted": [],
|
||||
}
|
||||
for group_key in sorted(groups):
|
||||
table[provider_header].append(
|
||||
framework.Provider or (provider.upper() if provider else "")
|
||||
)
|
||||
table[group_header].append(group_key)
|
||||
if groups[group_key]["FAIL"] > 0:
|
||||
table[labels.StatusHeader].append(
|
||||
f"{Fore.RED}{labels.FailLabel}({groups[group_key]['FAIL']}){Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
table[labels.StatusHeader].append(
|
||||
f"{Fore.GREEN}{labels.PassLabel}({groups[group_key]['PASS']}){Style.RESET_ALL}"
|
||||
)
|
||||
table["Muted"].append(
|
||||
f"{orange_color}{groups[group_key]['Muted']}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
results_title = (
|
||||
labels.ResultsTitle
|
||||
or f"Framework {Fore.YELLOW}{compliance_framework_name.upper()}{Style.RESET_ALL} Results:"
|
||||
)
|
||||
print(f"\n{results_title}")
|
||||
print(tabulate(table, headers="keys", tablefmt="rounded_grid"))
|
||||
footer = labels.FooterNote or "* Only sections containing results appear."
|
||||
print(f"{Style.BRIGHT}{footer}{Style.RESET_ALL}")
|
||||
print(f"\nDetailed results of {compliance_framework_name.upper()} are in:")
|
||||
print(
|
||||
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.csv"
|
||||
)
|
||||
if "json-ocsf" in (output_formats or []):
|
||||
print(
|
||||
f" - OCSF: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.ocsf.json"
|
||||
)
|
||||
print()
|
||||
|
||||
|
||||
def _render_split(
|
||||
findings,
|
||||
bulk_checks_metadata,
|
||||
compliance_framework_name,
|
||||
output_filename,
|
||||
output_directory,
|
||||
compliance_overview,
|
||||
framework,
|
||||
group_by,
|
||||
split_by,
|
||||
labels,
|
||||
provider=None,
|
||||
output_formats=None,
|
||||
):
|
||||
"""Split mode: one row per group with columns for each split value (e.g. Level 1/Level 2)."""
|
||||
check_map = _build_requirement_check_map(framework, provider)
|
||||
split_field = split_by.Field
|
||||
split_values = split_by.Values
|
||||
groups = {}
|
||||
pass_count = []
|
||||
fail_count = []
|
||||
muted_count = []
|
||||
|
||||
for index, finding in enumerate(findings):
|
||||
check_id = finding.check_metadata.CheckID
|
||||
if check_id not in check_map:
|
||||
continue
|
||||
|
||||
for req in check_map[check_id]:
|
||||
for group_key in _get_group_key(req, group_by):
|
||||
if group_key not in groups:
|
||||
groups[group_key] = {
|
||||
sv: {"FAIL": 0, "PASS": 0} for sv in split_values
|
||||
}
|
||||
groups[group_key]["Muted"] = 0
|
||||
|
||||
split_val = req.Attributes.get(split_field, "")
|
||||
|
||||
if finding.muted:
|
||||
if index not in muted_count:
|
||||
muted_count.append(index)
|
||||
groups[group_key]["Muted"] += 1
|
||||
else:
|
||||
if finding.status == "FAIL" and index not in fail_count:
|
||||
fail_count.append(index)
|
||||
elif finding.status == "PASS" and index not in pass_count:
|
||||
pass_count.append(index)
|
||||
|
||||
for sv in split_values:
|
||||
if sv in str(split_val):
|
||||
if not finding.muted:
|
||||
if finding.status == "FAIL":
|
||||
groups[group_key][sv]["FAIL"] += 1
|
||||
else:
|
||||
groups[group_key][sv]["PASS"] += 1
|
||||
|
||||
if not _print_overview(
|
||||
pass_count, fail_count, muted_count, compliance_framework_name, labels
|
||||
):
|
||||
return
|
||||
|
||||
if not compliance_overview:
|
||||
provider_header = labels.ProviderHeader
|
||||
group_header = labels.GroupHeader or group_by
|
||||
table = {provider_header: [], group_header: []}
|
||||
for sv in split_values:
|
||||
table[sv] = []
|
||||
table["Muted"] = []
|
||||
|
||||
for group_key in sorted(groups):
|
||||
table[provider_header].append(
|
||||
framework.Provider or (provider.upper() if provider else "")
|
||||
)
|
||||
table[group_header].append(group_key)
|
||||
for sv in split_values:
|
||||
if groups[group_key][sv]["FAIL"] > 0:
|
||||
table[sv].append(
|
||||
f"{Fore.RED}{labels.FailLabel}({groups[group_key][sv]['FAIL']}){Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
table[sv].append(
|
||||
f"{Fore.GREEN}{labels.PassLabel}({groups[group_key][sv]['PASS']}){Style.RESET_ALL}"
|
||||
)
|
||||
table["Muted"].append(
|
||||
f"{orange_color}{groups[group_key]['Muted']}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
results_title = (
|
||||
labels.ResultsTitle
|
||||
or f"Framework {Fore.YELLOW}{compliance_framework_name.upper()}{Style.RESET_ALL} Results:"
|
||||
)
|
||||
print(f"\n{results_title}")
|
||||
print(tabulate(table, headers="keys", tablefmt="rounded_grid"))
|
||||
footer = labels.FooterNote or "* Only sections containing results appear."
|
||||
print(f"{Style.BRIGHT}{footer}{Style.RESET_ALL}")
|
||||
print(f"\nDetailed results of {compliance_framework_name.upper()} are in:")
|
||||
print(
|
||||
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.csv"
|
||||
)
|
||||
if "json-ocsf" in (output_formats or []):
|
||||
print(
|
||||
f" - OCSF: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.ocsf.json"
|
||||
)
|
||||
print()
|
||||
|
||||
|
||||
def _render_scored(
|
||||
findings,
|
||||
bulk_checks_metadata,
|
||||
compliance_framework_name,
|
||||
output_filename,
|
||||
output_directory,
|
||||
compliance_overview,
|
||||
framework,
|
||||
group_by,
|
||||
scoring,
|
||||
labels,
|
||||
provider=None,
|
||||
output_formats=None,
|
||||
):
|
||||
"""Scored mode: weighted risk scoring per group (e.g. ThreatScore)."""
|
||||
check_map = _build_requirement_check_map(framework, provider)
|
||||
risk_field = scoring.RiskField
|
||||
weight_field = scoring.WeightField
|
||||
groups = {}
|
||||
pass_count = []
|
||||
fail_count = []
|
||||
muted_count = []
|
||||
|
||||
score_per_group = {}
|
||||
max_score_per_group = {}
|
||||
counted_per_group = {}
|
||||
generic_score = 0
|
||||
max_generic_score = 0
|
||||
counted_generic = []
|
||||
|
||||
for index, finding in enumerate(findings):
|
||||
check_id = finding.check_metadata.CheckID
|
||||
if check_id not in check_map:
|
||||
continue
|
||||
|
||||
for req in check_map[check_id]:
|
||||
for group_key in _get_group_key(req, group_by):
|
||||
attrs = req.Attributes
|
||||
risk = attrs.get(risk_field, 0)
|
||||
weight = attrs.get(weight_field, 0)
|
||||
|
||||
if group_key not in groups:
|
||||
groups[group_key] = {"FAIL": 0, "PASS": 0, "Muted": 0}
|
||||
score_per_group[group_key] = 0
|
||||
max_score_per_group[group_key] = 0
|
||||
counted_per_group[group_key] = []
|
||||
|
||||
if index not in counted_per_group[group_key] and not finding.muted:
|
||||
if finding.status == "PASS":
|
||||
score_per_group[group_key] += risk * weight
|
||||
max_score_per_group[group_key] += risk * weight
|
||||
counted_per_group[group_key].append(index)
|
||||
|
||||
if finding.muted:
|
||||
if index not in muted_count:
|
||||
muted_count.append(index)
|
||||
groups[group_key]["Muted"] += 1
|
||||
else:
|
||||
if finding.status == "FAIL" and index not in fail_count:
|
||||
fail_count.append(index)
|
||||
groups[group_key]["FAIL"] += 1
|
||||
elif finding.status == "PASS" and index not in pass_count:
|
||||
pass_count.append(index)
|
||||
groups[group_key]["PASS"] += 1
|
||||
|
||||
if index not in counted_generic and not finding.muted:
|
||||
if finding.status == "PASS":
|
||||
generic_score += risk * weight
|
||||
max_generic_score += risk * weight
|
||||
counted_generic.append(index)
|
||||
|
||||
if not _print_overview(
|
||||
pass_count, fail_count, muted_count, compliance_framework_name, labels
|
||||
):
|
||||
return
|
||||
|
||||
if not compliance_overview:
|
||||
provider_header = labels.ProviderHeader
|
||||
group_header = labels.GroupHeader or group_by
|
||||
table = {
|
||||
provider_header: [],
|
||||
group_header: [],
|
||||
labels.StatusHeader: [],
|
||||
"Score": [],
|
||||
"Muted": [],
|
||||
}
|
||||
|
||||
for group_key in sorted(groups):
|
||||
table[provider_header].append(
|
||||
framework.Provider or (provider.upper() if provider else "")
|
||||
)
|
||||
table[group_header].append(group_key)
|
||||
if max_score_per_group[group_key] == 0:
|
||||
group_score = 100.0
|
||||
score_color = Fore.GREEN
|
||||
else:
|
||||
group_score = (
|
||||
score_per_group[group_key] / max_score_per_group[group_key]
|
||||
) * 100
|
||||
score_color = Fore.RED
|
||||
table["Score"].append(
|
||||
f"{Style.BRIGHT}{score_color}{group_score:.2f}%{Style.RESET_ALL}"
|
||||
)
|
||||
if groups[group_key]["FAIL"] > 0:
|
||||
table[labels.StatusHeader].append(
|
||||
f"{Fore.RED}{labels.FailLabel}({groups[group_key]['FAIL']}){Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
table[labels.StatusHeader].append(
|
||||
f"{Fore.GREEN}{labels.PassLabel}({groups[group_key]['PASS']}){Style.RESET_ALL}"
|
||||
)
|
||||
table["Muted"].append(
|
||||
f"{orange_color}{groups[group_key]['Muted']}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
if max_generic_score == 0:
|
||||
generic_threat_score = 100.0
|
||||
else:
|
||||
generic_threat_score = generic_score / max_generic_score * 100
|
||||
|
||||
results_title = (
|
||||
labels.ResultsTitle
|
||||
or f"Framework {Fore.YELLOW}{compliance_framework_name.upper()}{Style.RESET_ALL} Results:"
|
||||
)
|
||||
print(f"\n{results_title}")
|
||||
print(f"\nGeneric Threat Score: {generic_threat_score:.2f}%")
|
||||
print(tabulate(table, headers="keys", tablefmt="rounded_grid"))
|
||||
footer = labels.FooterNote or (
|
||||
f"{Style.BRIGHT}\n=== Threat Score Guide ===\n"
|
||||
f"The lower the score, the higher the risk.{Style.RESET_ALL}\n"
|
||||
f"{Style.BRIGHT}(Only sections containing results appear, the score is calculated as the sum of the "
|
||||
f"level of risk * weight of the passed findings divided by the sum of the risk * weight of all the findings){Style.RESET_ALL}"
|
||||
)
|
||||
print(footer)
|
||||
print(f"\nDetailed results of {compliance_framework_name.upper()} are in:")
|
||||
print(
|
||||
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.csv"
|
||||
)
|
||||
if "json-ocsf" in (output_formats or []):
|
||||
print(
|
||||
f" - OCSF: {output_directory}/compliance/{output_filename}_{compliance_framework_name}.ocsf.json"
|
||||
)
|
||||
print()
|
||||
1042
tests/lib/check/universal_compliance_models_test.py
Normal file
1042
tests/lib/check/universal_compliance_models_test.py
Normal file
File diff suppressed because it is too large
Load Diff
0
tests/lib/outputs/compliance/universal/__init__.py
Normal file
0
tests/lib/outputs/compliance/universal/__init__.py
Normal file
448
tests/lib/outputs/compliance/universal/ocsf_compliance_test.py
Normal file
448
tests/lib/outputs/compliance/universal/ocsf_compliance_test.py
Normal file
@@ -0,0 +1,448 @@
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from types import SimpleNamespace
|
||||
|
||||
from py_ocsf_models.events.findings.compliance_finding import ComplianceFinding
|
||||
from py_ocsf_models.events.findings.compliance_finding_type_id import (
|
||||
ComplianceFindingTypeID,
|
||||
)
|
||||
from py_ocsf_models.objects.compliance_status import StatusID as ComplianceStatusID
|
||||
|
||||
from prowler.lib.check.compliance_models import (
|
||||
AttributeMetadata,
|
||||
ComplianceFramework,
|
||||
OutputsConfig,
|
||||
TableConfig,
|
||||
UniversalComplianceRequirement,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.universal.ocsf_compliance import (
|
||||
OCSFComplianceOutput,
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", provider="aws"):
|
||||
"""Create a mock Finding with all fields needed by OCSFComplianceOutput."""
|
||||
finding = SimpleNamespace()
|
||||
finding.provider = provider
|
||||
finding.account_uid = "123456789012"
|
||||
finding.account_name = "test-account"
|
||||
finding.account_email = ""
|
||||
finding.account_organization_uid = "org-123"
|
||||
finding.account_organization_name = "test-org"
|
||||
finding.account_tags = {"env": "test"}
|
||||
finding.region = "us-east-1"
|
||||
finding.status = status
|
||||
finding.status_extended = f"{check_id} is {status}"
|
||||
finding.resource_uid = f"arn:aws:iam::123456789012:{check_id}"
|
||||
finding.resource_name = check_id
|
||||
finding.resource_details = "some details"
|
||||
finding.resource_metadata = {}
|
||||
finding.resource_tags = {"Name": "test"}
|
||||
finding.partition = "aws"
|
||||
finding.muted = False
|
||||
finding.check_id = check_id
|
||||
finding.uid = "test-finding-uid"
|
||||
finding.timestamp = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
|
||||
finding.prowler_version = "5.0.0"
|
||||
finding.compliance = {}
|
||||
finding.metadata = SimpleNamespace(
|
||||
Provider=provider,
|
||||
CheckID=check_id,
|
||||
CheckTitle=f"Title for {check_id}",
|
||||
CheckType=["test-type"],
|
||||
Description=f"Description for {check_id}",
|
||||
Severity="medium",
|
||||
ServiceName="iam",
|
||||
ResourceType="aws-iam-role",
|
||||
Risk="test-risk",
|
||||
RelatedUrl="https://example.com",
|
||||
Remediation=SimpleNamespace(
|
||||
Recommendation=SimpleNamespace(Text="Fix it", Url="https://fix.com"),
|
||||
),
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Categories=["test"],
|
||||
Notes="",
|
||||
AdditionalURLs=[],
|
||||
)
|
||||
return finding
|
||||
|
||||
|
||||
def _make_framework(requirements, attrs_metadata=None):
|
||||
return ComplianceFramework(
|
||||
Framework="TestFW",
|
||||
Name="Test Framework",
|
||||
Provider="AWS",
|
||||
Version="1.0",
|
||||
Description="Test framework",
|
||||
Requirements=requirements,
|
||||
AttributesMetadata=attrs_metadata,
|
||||
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
|
||||
)
|
||||
|
||||
|
||||
def _simple_requirement(req_id="REQ-1", checks=None):
|
||||
return UniversalComplianceRequirement(
|
||||
Id=req_id,
|
||||
Description=f"Description for {req_id}",
|
||||
Attributes={},
|
||||
Checks=checks if checks is not None else ["check_a"],
|
||||
)
|
||||
|
||||
|
||||
class TestOCSFComplianceOutput:
|
||||
def test_transform_basic(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert len(output.data) == 1
|
||||
assert isinstance(output.data[0], ComplianceFinding)
|
||||
|
||||
def test_class_uid(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert output.data[0].class_uid == 2003
|
||||
|
||||
def test_type_uid(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert output.data[0].type_uid == ComplianceFindingTypeID.Create
|
||||
assert output.data[0].type_uid == 200301
|
||||
|
||||
def test_compliance_object_fields(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
compliance = output.data[0].compliance
|
||||
assert compliance.standards == ["TestFW-1.0"]
|
||||
assert compliance.requirements == ["REQ-1"]
|
||||
assert compliance.control == "Description for REQ-1"
|
||||
assert compliance.status_id == ComplianceStatusID.Pass
|
||||
|
||||
def test_check_object_fields(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", "FAIL")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
checks = output.data[0].compliance.checks
|
||||
assert len(checks) == 1
|
||||
assert checks[0].uid == "check_a"
|
||||
assert checks[0].name == "Title for check_a"
|
||||
assert checks[0].status == "FAIL"
|
||||
assert checks[0].status_id == ComplianceStatusID.Fail
|
||||
|
||||
def test_finding_info_fields(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
info = output.data[0].finding_info
|
||||
assert info.uid == "test-finding-uid-REQ-1"
|
||||
assert info.title == "REQ-1"
|
||||
assert info.desc == "Description for REQ-1"
|
||||
|
||||
def test_metadata_fields(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
metadata = output.data[0].metadata
|
||||
assert metadata.product.name == "Prowler"
|
||||
assert metadata.product.uid == "prowler"
|
||||
assert metadata.event_code == "check_a"
|
||||
|
||||
def test_status_mapping_pass(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert output.data[0].compliance.status_id == ComplianceStatusID.Pass
|
||||
|
||||
def test_status_mapping_fail(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", "FAIL")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert output.data[0].compliance.status_id == ComplianceStatusID.Fail
|
||||
|
||||
def test_manual_requirement(self):
|
||||
req = _simple_requirement("MANUAL-1", checks=[])
|
||||
fw = _make_framework([req])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert len(output.data) == 1
|
||||
cf = output.data[0]
|
||||
assert cf.compliance.status_id == ComplianceStatusID.Unknown
|
||||
assert cf.status_code == "MANUAL"
|
||||
assert cf.finding_info.uid == "manual-MANUAL-1"
|
||||
|
||||
def test_multi_provider_checks_dict(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Multi-provider req",
|
||||
Attributes={},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
)
|
||||
fw = _make_framework([req])
|
||||
findings = [_make_finding("check_a", "PASS", provider="aws")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert len(output.data) == 1
|
||||
assert output.data[0].compliance.checks[0].uid == "check_a"
|
||||
|
||||
def test_empty_findings(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
|
||||
output = OCSFComplianceOutput(findings=[], framework=fw, provider="aws")
|
||||
|
||||
assert output.data == []
|
||||
|
||||
def test_cloud_info_in_unmapped(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", provider="aws")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
cf = output.data[0]
|
||||
assert cf.unmapped is not None
|
||||
assert cf.unmapped["cloud"]["provider"] == "aws"
|
||||
assert cf.unmapped["cloud"]["account"]["uid"] == "123456789012"
|
||||
assert cf.unmapped["cloud"]["account"]["name"] == "test-account"
|
||||
|
||||
def test_resources_populated(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
resources = output.data[0].resources
|
||||
assert len(resources) == 1
|
||||
assert resources[0].name == "check_a"
|
||||
assert resources[0].uid == "arn:aws:iam::123456789012:check_a"
|
||||
assert resources[0].type == "aws-iam-role"
|
||||
|
||||
def test_batch_write_to_file(self, tmp_path):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
filepath = str(tmp_path / "compliance.ocsf.json")
|
||||
|
||||
output = OCSFComplianceOutput(
|
||||
findings=findings, framework=fw, file_path=filepath, provider="aws"
|
||||
)
|
||||
output.batch_write_data_to_file()
|
||||
|
||||
with open(filepath) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert isinstance(data, list)
|
||||
assert len(data) == 1
|
||||
assert data[0]["class_uid"] == 2003
|
||||
assert data[0]["compliance"]["standards"] == ["TestFW-1.0"]
|
||||
assert data[0]["compliance"]["requirements"] == ["REQ-1"]
|
||||
|
||||
def test_multiple_findings_same_requirement(self):
|
||||
fw = _make_framework([_simple_requirement("REQ-1", ["check_a"])])
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS"),
|
||||
_make_finding("check_a", "FAIL"),
|
||||
]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert len(output.data) == 2
|
||||
statuses = [cf.compliance.status_id for cf in output.data]
|
||||
assert ComplianceStatusID.Pass in statuses
|
||||
assert ComplianceStatusID.Fail in statuses
|
||||
|
||||
def test_requirement_attributes_in_unmapped(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Test requirement",
|
||||
Attributes={"Section": "IAM", "Profile": "Level 1"},
|
||||
Checks=["check_a"],
|
||||
)
|
||||
fw = _make_framework([req])
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
cf = output.data[0]
|
||||
assert cf.unmapped is not None
|
||||
assert "requirement_attributes" in cf.unmapped
|
||||
assert cf.unmapped["requirement_attributes"]["section"] == "IAM"
|
||||
assert cf.unmapped["requirement_attributes"]["profile"] == "Level 1"
|
||||
|
||||
def test_requirement_attributes_keys_are_snake_case(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Test requirement",
|
||||
Attributes={"Section": "IAM", "CCMLite": "Yes", "SubSection": "1.1"},
|
||||
Checks=["check_a"],
|
||||
)
|
||||
fw = _make_framework([req])
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
attrs = output.data[0].unmapped["requirement_attributes"]
|
||||
assert "section" in attrs
|
||||
assert "ccm_lite" in attrs
|
||||
assert "sub_section" in attrs
|
||||
|
||||
def test_requirement_attributes_empty_attrs_excluded(self):
|
||||
req = _simple_requirement("REQ-1", checks=["check_a"])
|
||||
fw = _make_framework([req])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
cf = output.data[0]
|
||||
# Cloud info is still present, but no requirement_attributes key
|
||||
assert cf.unmapped is not None
|
||||
assert "cloud" in cf.unmapped
|
||||
assert "requirement_attributes" not in cf.unmapped
|
||||
|
||||
def test_manual_requirement_has_attributes_in_unmapped(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="MANUAL-1",
|
||||
Description="Manual check",
|
||||
Attributes={"Section": "Logging", "Type": "manual"},
|
||||
Checks=[],
|
||||
)
|
||||
fw = _make_framework([req])
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
assert len(output.data) == 1
|
||||
cf = output.data[0]
|
||||
assert cf.unmapped is not None
|
||||
assert cf.unmapped["requirement_attributes"]["section"] == "Logging"
|
||||
assert cf.unmapped["requirement_attributes"]["type"] == "manual"
|
||||
# Manual findings have no cloud info (finding is None)
|
||||
assert "cloud" not in cf.unmapped
|
||||
|
||||
def test_ocsf_metadata_filters_attributes(self):
|
||||
"""Attributes with OCSF=False in metadata should be excluded from unmapped."""
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str", OCSF=True),
|
||||
AttributeMetadata(Key="InternalNote", Type="str", OCSF=False),
|
||||
AttributeMetadata(Key="Profile", Type="str", OCSF=True),
|
||||
]
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Test",
|
||||
Attributes={
|
||||
"Section": "IAM",
|
||||
"InternalNote": "skip me",
|
||||
"Profile": "Level 1",
|
||||
},
|
||||
Checks=["check_a"],
|
||||
)
|
||||
fw = _make_framework([req], attrs_metadata=metadata)
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
attrs = output.data[0].unmapped["requirement_attributes"]
|
||||
assert "section" in attrs
|
||||
assert "profile" in attrs
|
||||
assert "internal_note" not in attrs
|
||||
|
||||
def test_ocsf_metadata_all_false_excludes_all(self):
|
||||
"""When all attributes have OCSF=False, requirement_attributes should be empty."""
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str", OCSF=False),
|
||||
]
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a"],
|
||||
)
|
||||
fw = _make_framework([req], attrs_metadata=metadata)
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
cf = output.data[0]
|
||||
assert cf.unmapped is not None
|
||||
# requirement_attributes should not appear since all attrs are filtered out
|
||||
assert "requirement_attributes" not in cf.unmapped
|
||||
|
||||
def test_ocsf_no_metadata_includes_all(self):
|
||||
"""Without AttributesMetadata, all attributes should be included (backward compat)."""
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Test",
|
||||
Attributes={"Section": "IAM", "Custom": "value"},
|
||||
Checks=["check_a"],
|
||||
)
|
||||
fw = _make_framework([req], attrs_metadata=None)
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
attrs = output.data[0].unmapped["requirement_attributes"]
|
||||
assert "section" in attrs
|
||||
assert "custom" in attrs
|
||||
|
||||
def test_ocsf_default_is_true(self):
|
||||
"""OCSF defaults to True — attributes are included unless explicitly excluded."""
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
AttributeMetadata(Key="Profile", Type="str"),
|
||||
]
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="REQ-1",
|
||||
Description="Test",
|
||||
Attributes={"Section": "IAM", "Profile": "Level 1"},
|
||||
Checks=["check_a"],
|
||||
)
|
||||
fw = _make_framework([req], attrs_metadata=metadata)
|
||||
findings = [_make_finding("check_a", "PASS")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
attrs = output.data[0].unmapped["requirement_attributes"]
|
||||
assert "section" in attrs
|
||||
assert "profile" in attrs
|
||||
|
||||
def test_ocsf_filter_on_manual_requirements(self):
|
||||
"""OCSF filtering should also apply to manual requirements."""
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str", OCSF=True),
|
||||
AttributeMetadata(Key="InternalNote", Type="str", OCSF=False),
|
||||
]
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="MANUAL-1",
|
||||
Description="Manual",
|
||||
Attributes={"Section": "Logging", "InternalNote": "hidden"},
|
||||
Checks=[],
|
||||
)
|
||||
fw = _make_framework([req], attrs_metadata=metadata)
|
||||
findings = [_make_finding("check_a")]
|
||||
|
||||
output = OCSFComplianceOutput(findings=findings, framework=fw, provider="aws")
|
||||
|
||||
cf = output.data[0]
|
||||
assert cf.unmapped["requirement_attributes"]["section"] == "Logging"
|
||||
assert "internal_note" not in cf.unmapped["requirement_attributes"]
|
||||
551
tests/lib/outputs/compliance/universal/universal_output_test.py
Normal file
551
tests/lib/outputs/compliance/universal/universal_output_test.py
Normal file
@@ -0,0 +1,551 @@
|
||||
from types import SimpleNamespace
|
||||
|
||||
from prowler.lib.check.compliance_models import (
|
||||
AttributeMetadata,
|
||||
ComplianceFramework,
|
||||
OutputsConfig,
|
||||
TableConfig,
|
||||
UniversalComplianceRequirement,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.universal.universal_output import (
|
||||
UniversalComplianceOutput,
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", compliance_map=None):
|
||||
"""Create a mock Finding for output tests."""
|
||||
finding = SimpleNamespace()
|
||||
finding.provider = "aws"
|
||||
finding.account_uid = "123456789012"
|
||||
finding.account_name = "test-account"
|
||||
finding.region = "us-east-1"
|
||||
finding.status = status
|
||||
finding.status_extended = f"{check_id} is {status}"
|
||||
finding.resource_uid = f"arn:aws:iam::123456789012:{check_id}"
|
||||
finding.resource_name = check_id
|
||||
finding.muted = False
|
||||
finding.check_id = check_id
|
||||
finding.metadata = SimpleNamespace(
|
||||
Provider="aws",
|
||||
CheckID=check_id,
|
||||
Severity="medium",
|
||||
)
|
||||
finding.compliance = compliance_map or {}
|
||||
return finding
|
||||
|
||||
|
||||
def _make_framework(requirements, attrs_metadata=None, table_config=None):
|
||||
return ComplianceFramework(
|
||||
Framework="TestFW",
|
||||
Name="Test Framework",
|
||||
Provider="AWS",
|
||||
Version="1.0",
|
||||
Description="Test framework",
|
||||
Requirements=requirements,
|
||||
AttributesMetadata=attrs_metadata,
|
||||
Outputs=OutputsConfig(Table_Config=table_config) if table_config else None,
|
||||
)
|
||||
|
||||
|
||||
class TestDynamicCSVColumns:
|
||||
def test_columns_match_metadata(self, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM", "SubSection": "Auth"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
AttributeMetadata(Key="SubSection", Type="str"),
|
||||
]
|
||||
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
|
||||
assert len(output.data) == 1
|
||||
row_dict = output.data[0].dict()
|
||||
assert "Requirements_Attributes_Section" in row_dict
|
||||
assert "Requirements_Attributes_SubSection" in row_dict
|
||||
assert row_dict["Requirements_Attributes_Section"] == "IAM"
|
||||
assert row_dict["Requirements_Attributes_SubSection"] == "Auth"
|
||||
|
||||
|
||||
class TestManualRequirements:
|
||||
def test_manual_status(self, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="manual-1",
|
||||
Description="manual check",
|
||||
Attributes={"Section": "Governance"},
|
||||
Checks=[],
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
]
|
||||
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
|
||||
# Should have 1 real finding + 1 manual
|
||||
assert len(output.data) == 2
|
||||
manual_rows = [r for r in output.data if r.dict()["Status"] == "MANUAL"]
|
||||
assert len(manual_rows) == 1
|
||||
assert manual_rows[0].dict()["Requirements_Id"] == "manual-1"
|
||||
assert manual_rows[0].dict()["ResourceId"] == "manual_check"
|
||||
|
||||
|
||||
class TestMITREExtraColumns:
|
||||
def test_mitre_columns_present(self, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="T1190",
|
||||
Description="Exploit",
|
||||
Attributes={},
|
||||
Checks=["check_a"],
|
||||
Tactics=["Initial Access"],
|
||||
SubTechniques=[],
|
||||
Platforms=["IaaS"],
|
||||
TechniqueURL="https://attack.mitre.org/techniques/T1190/",
|
||||
),
|
||||
]
|
||||
fw = _make_framework(reqs, None, TableConfig(GroupBy="_Tactics"))
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"TestFW-1.0": ["T1190"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
|
||||
assert len(output.data) == 1
|
||||
row_dict = output.data[0].dict()
|
||||
assert "Requirements_Tactics" in row_dict
|
||||
assert row_dict["Requirements_Tactics"] == "Initial Access"
|
||||
assert "Requirements_TechniqueURL" in row_dict
|
||||
|
||||
|
||||
class TestCSVFileWrite:
|
||||
def test_batch_write(self, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
]
|
||||
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
output.batch_write_data_to_file()
|
||||
|
||||
# Verify file was created and has content
|
||||
with open(filepath, "r") as f:
|
||||
content = f.read()
|
||||
assert "PROVIDER" in content # Headers are uppercase
|
||||
assert "REQUIREMENTS_ATTRIBUTES_SECTION" in content
|
||||
assert "IAM" in content
|
||||
|
||||
|
||||
class TestNoFindings:
|
||||
def test_empty_findings_no_data(self, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
]
|
||||
fw = _make_framework(reqs, None, TableConfig(GroupBy="Section"))
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=[],
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
assert len(output.data) == 0
|
||||
|
||||
|
||||
class TestMultiProviderOutput:
|
||||
def test_dict_checks_filtered_by_provider(self, tmp_path):
|
||||
"""Only checks for the given provider appear in CSV output."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
]
|
||||
fw = ComplianceFramework(
|
||||
Framework="MultiCloud",
|
||||
Name="Multi",
|
||||
Version="1.0",
|
||||
Description="Test multi-provider",
|
||||
Requirements=reqs,
|
||||
AttributesMetadata=metadata,
|
||||
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
|
||||
)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"MultiCloud-1.0": ["1.1"]}),
|
||||
_make_finding("check_b", "FAIL", {"MultiCloud-1.0": ["1.1"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
provider="aws",
|
||||
)
|
||||
|
||||
# Only check_a should match (it's the AWS check)
|
||||
assert len(output.data) == 1
|
||||
row_dict = output.data[0].dict()
|
||||
assert row_dict["Requirements_Attributes_Section"] == "IAM"
|
||||
|
||||
def test_no_provider_includes_all(self, tmp_path):
|
||||
"""Without provider filter, all checks from all providers are included."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
]
|
||||
fw = ComplianceFramework(
|
||||
Framework="MultiCloud",
|
||||
Name="Multi",
|
||||
Version="1.0",
|
||||
Description="Test multi-provider",
|
||||
Requirements=reqs,
|
||||
AttributesMetadata=metadata,
|
||||
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
|
||||
)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"MultiCloud-1.0": ["1.1"]}),
|
||||
_make_finding("check_b", "FAIL", {"MultiCloud-1.0": ["1.1"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
|
||||
# Both checks should be included without provider filter
|
||||
assert len(output.data) == 2
|
||||
|
||||
def test_empty_dict_checks_is_manual(self, tmp_path):
|
||||
"""Requirement with empty dict Checks is treated as manual."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="manual-1",
|
||||
Description="manual check",
|
||||
Attributes={"Section": "Governance"},
|
||||
Checks={},
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
]
|
||||
fw = ComplianceFramework(
|
||||
Framework="MultiCloud",
|
||||
Name="Multi",
|
||||
Version="1.0",
|
||||
Description="Test",
|
||||
Requirements=reqs,
|
||||
AttributesMetadata=metadata,
|
||||
Outputs=OutputsConfig(Table_Config=TableConfig(GroupBy="Section")),
|
||||
)
|
||||
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=[_make_finding("other_check", "PASS", {})],
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
provider="aws",
|
||||
)
|
||||
|
||||
manual_rows = [r for r in output.data if r.dict()["Status"] == "MANUAL"]
|
||||
assert len(manual_rows) == 1
|
||||
assert manual_rows[0].dict()["Requirements_Id"] == "manual-1"
|
||||
|
||||
|
||||
class TestCSVExclude:
|
||||
def test_csv_false_excludes_column(self, tmp_path):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM", "Internal": "hidden"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str", CSV=True),
|
||||
AttributeMetadata(Key="Internal", Type="str", CSV=False),
|
||||
]
|
||||
fw = _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS", {"TestFW-1.0": ["1.1"]}),
|
||||
]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
)
|
||||
|
||||
row_dict = output.data[0].dict()
|
||||
assert "Requirements_Attributes_Section" in row_dict
|
||||
assert "Requirements_Attributes_Internal" not in row_dict
|
||||
|
||||
|
||||
def _make_provider_finding(provider, check_id="check_a", status="PASS"):
|
||||
"""Create a mock Finding with a specific provider."""
|
||||
finding = _make_finding(check_id, status, {"TestFW-1.0": ["1.1"]})
|
||||
finding.provider = provider
|
||||
return finding
|
||||
|
||||
|
||||
def _simple_framework():
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
]
|
||||
metadata = [
|
||||
AttributeMetadata(Key="Section", Type="str"),
|
||||
]
|
||||
return _make_framework(reqs, metadata, TableConfig(GroupBy="Section"))
|
||||
|
||||
|
||||
class TestProviderHeaders:
|
||||
def test_aws_headers(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("aws")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
provider="aws",
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "AccountId" in row_dict
|
||||
assert "Region" in row_dict
|
||||
assert row_dict["AccountId"] == "123456789012"
|
||||
assert row_dict["Region"] == "us-east-1"
|
||||
|
||||
def test_azure_headers(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("azure")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
provider="azure",
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "SubscriptionId" in row_dict
|
||||
assert "Location" in row_dict
|
||||
assert row_dict["SubscriptionId"] == "123456789012"
|
||||
assert row_dict["Location"] == "us-east-1"
|
||||
|
||||
def test_gcp_headers(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("gcp")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
provider="gcp",
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "ProjectId" in row_dict
|
||||
assert "Location" in row_dict
|
||||
assert row_dict["ProjectId"] == "123456789012"
|
||||
|
||||
def test_kubernetes_headers(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("kubernetes")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
provider="kubernetes",
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "Context" in row_dict
|
||||
assert "Namespace" in row_dict
|
||||
# Kubernetes Context maps to account_name
|
||||
assert row_dict["Context"] == "test-account"
|
||||
assert row_dict["Namespace"] == "us-east-1"
|
||||
|
||||
def test_github_headers(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("github")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
provider="github",
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "Account_Name" in row_dict
|
||||
assert "Account_Id" in row_dict
|
||||
# GitHub: Account_Name (pos 3) from account_name, Account_Id (pos 4) from account_uid
|
||||
assert row_dict["Account_Name"] == "test-account"
|
||||
assert row_dict["Account_Id"] == "123456789012"
|
||||
# Verify column order matches legacy (Account_Name before Account_Id)
|
||||
keys = list(row_dict.keys())
|
||||
assert keys.index("Account_Name") < keys.index("Account_Id")
|
||||
|
||||
def test_unknown_provider_defaults(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("unknown")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
provider="unknown",
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "AccountId" in row_dict
|
||||
assert "Region" in row_dict
|
||||
|
||||
def test_none_provider_defaults(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("aws")]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / "test.csv"),
|
||||
)
|
||||
row_dict = output.data[0].dict()
|
||||
assert "AccountId" in row_dict
|
||||
assert "Region" in row_dict
|
||||
|
||||
def test_csv_write_azure_headers(self, tmp_path):
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding("azure")]
|
||||
filepath = str(tmp_path / "test.csv")
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=filepath,
|
||||
provider="azure",
|
||||
)
|
||||
output.batch_write_data_to_file()
|
||||
|
||||
with open(filepath, "r") as f:
|
||||
content = f.read()
|
||||
assert "SUBSCRIPTIONID" in content
|
||||
assert "LOCATION" in content
|
||||
# Should NOT have the default AccountId/Region headers
|
||||
assert "ACCOUNTID" not in content
|
||||
|
||||
def test_column_order_matches_legacy(self, tmp_path):
|
||||
"""Verify that the base column order matches the legacy per-provider models.
|
||||
|
||||
Legacy models all define: Provider, Description, <col3>, <col4>, AssessmentDate, ...
|
||||
The universal output must preserve this exact order for backward compatibility.
|
||||
"""
|
||||
# Expected column order per provider (positions 3 and 4 after Provider, Description)
|
||||
legacy_order = {
|
||||
"aws": ("AccountId", "Region"),
|
||||
"azure": ("SubscriptionId", "Location"),
|
||||
"gcp": ("ProjectId", "Location"),
|
||||
"kubernetes": ("Context", "Namespace"),
|
||||
"m365": ("TenantId", "Location"),
|
||||
"github": ("Account_Name", "Account_Id"),
|
||||
"oraclecloud": ("TenancyId", "Region"),
|
||||
"alibabacloud": ("AccountId", "Region"),
|
||||
"nhn": ("AccountId", "Region"),
|
||||
}
|
||||
|
||||
for provider_name, (expected_col3, expected_col4) in legacy_order.items():
|
||||
fw = _simple_framework()
|
||||
findings = [_make_provider_finding(provider_name)]
|
||||
output = UniversalComplianceOutput(
|
||||
findings=findings,
|
||||
framework=fw,
|
||||
file_path=str(tmp_path / f"test_{provider_name}.csv"),
|
||||
provider=provider_name,
|
||||
)
|
||||
keys = list(output.data[0].dict().keys())
|
||||
assert keys[0] == "Provider", f"{provider_name}: col 1 should be Provider"
|
||||
assert (
|
||||
keys[1] == "Description"
|
||||
), f"{provider_name}: col 2 should be Description"
|
||||
assert (
|
||||
keys[2] == expected_col3
|
||||
), f"{provider_name}: col 3 should be {expected_col3}, got {keys[2]}"
|
||||
assert (
|
||||
keys[3] == expected_col4
|
||||
), f"{provider_name}: col 4 should be {expected_col4}, got {keys[3]}"
|
||||
assert (
|
||||
keys[4] == "AssessmentDate"
|
||||
), f"{provider_name}: col 5 should be AssessmentDate"
|
||||
384
tests/lib/outputs/compliance/universal/universal_table_test.py
Normal file
384
tests/lib/outputs/compliance/universal/universal_table_test.py
Normal file
@@ -0,0 +1,384 @@
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from prowler.lib.check.compliance_models import (
|
||||
ComplianceFramework,
|
||||
OutputsConfig,
|
||||
ScoringConfig,
|
||||
SplitByConfig,
|
||||
TableConfig,
|
||||
TableLabels,
|
||||
UniversalComplianceRequirement,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.universal.universal_table import (
|
||||
_build_requirement_check_map,
|
||||
_get_group_key,
|
||||
get_universal_table,
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(check_id, status="PASS", muted=False):
|
||||
"""Create a mock finding for table tests."""
|
||||
finding = SimpleNamespace()
|
||||
finding.check_metadata = SimpleNamespace(CheckID=check_id)
|
||||
finding.status = status
|
||||
finding.muted = muted
|
||||
return finding
|
||||
|
||||
|
||||
def _make_framework(requirements, table_config, provider="AWS"):
|
||||
return ComplianceFramework(
|
||||
Framework="TestFW",
|
||||
Name="Test Framework",
|
||||
Provider=provider,
|
||||
Version="1.0",
|
||||
Description="Test",
|
||||
Requirements=requirements,
|
||||
Outputs=OutputsConfig(Table_Config=table_config) if table_config else None,
|
||||
)
|
||||
|
||||
|
||||
class TestBuildRequirementCheckMap:
|
||||
def test_basic(self):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a", "check_b"],
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.2",
|
||||
Description="test2",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_b", "check_c"],
|
||||
),
|
||||
]
|
||||
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
|
||||
check_map = _build_requirement_check_map(fw)
|
||||
assert "check_a" in check_map
|
||||
assert len(check_map["check_b"]) == 2
|
||||
assert "check_c" in check_map
|
||||
|
||||
def test_dict_checks_no_provider_filter(self):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
),
|
||||
]
|
||||
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
|
||||
check_map = _build_requirement_check_map(fw)
|
||||
assert "check_a" in check_map
|
||||
assert "check_b" in check_map
|
||||
|
||||
def test_dict_checks_filtered_by_provider(self):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
),
|
||||
]
|
||||
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
|
||||
check_map = _build_requirement_check_map(fw, provider="aws")
|
||||
assert "check_a" in check_map
|
||||
assert "check_b" not in check_map
|
||||
|
||||
def test_dict_checks_provider_not_present(self):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
),
|
||||
]
|
||||
fw = _make_framework(reqs, TableConfig(GroupBy="Section"))
|
||||
check_map = _build_requirement_check_map(fw, provider="gcp")
|
||||
assert len(check_map) == 0
|
||||
|
||||
|
||||
class TestGetGroupKey:
|
||||
def test_normal_field(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=[],
|
||||
)
|
||||
assert _get_group_key(req, "Section") == ["IAM"]
|
||||
|
||||
def test_tactics(self):
|
||||
req = UniversalComplianceRequirement(
|
||||
Id="T1190",
|
||||
Description="test",
|
||||
Attributes={},
|
||||
Checks=[],
|
||||
Tactics=["Initial Access", "Execution"],
|
||||
)
|
||||
assert _get_group_key(req, "_Tactics") == ["Initial Access", "Execution"]
|
||||
|
||||
|
||||
class TestGroupedMode:
|
||||
def test_grouped_rendering(self, capsys):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="2.1",
|
||||
Description="test2",
|
||||
Attributes={"Section": "Logging"},
|
||||
Checks=["check_b"],
|
||||
),
|
||||
]
|
||||
tc = TableConfig(GroupBy="Section")
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS"),
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "IAM" in captured.out
|
||||
assert "Logging" in captured.out
|
||||
assert "PASS" in captured.out
|
||||
assert "FAIL" in captured.out
|
||||
|
||||
|
||||
class TestSplitMode:
|
||||
def test_split_rendering(self, capsys):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "Storage", "Profile": "Level 1"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.2",
|
||||
Description="test2",
|
||||
Attributes={"Section": "Storage", "Profile": "Level 2"},
|
||||
Checks=["check_b"],
|
||||
),
|
||||
]
|
||||
tc = TableConfig(
|
||||
GroupBy="Section",
|
||||
SplitBy=SplitByConfig(Field="Profile", Values=["Level 1", "Level 2"]),
|
||||
)
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS"),
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Storage" in captured.out
|
||||
assert "Level 1" in captured.out
|
||||
assert "Level 2" in captured.out
|
||||
|
||||
|
||||
class TestScoredMode:
|
||||
def test_scored_rendering(self, capsys):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM", "LevelOfRisk": 5, "Weight": 100},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.2",
|
||||
Description="test2",
|
||||
Attributes={"Section": "IAM", "LevelOfRisk": 3, "Weight": 50},
|
||||
Checks=["check_b"],
|
||||
),
|
||||
]
|
||||
tc = TableConfig(
|
||||
GroupBy="Section",
|
||||
Scoring=ScoringConfig(RiskField="LevelOfRisk", WeightField="Weight"),
|
||||
)
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS"),
|
||||
_make_finding("check_b", "FAIL"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "IAM" in captured.out
|
||||
assert "Score" in captured.out
|
||||
assert "Threat Score" in captured.out
|
||||
|
||||
|
||||
class TestCustomLabels:
|
||||
def test_ens_spanish_labels(self, capsys):
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Marco": "operacional"},
|
||||
Checks=["check_a"],
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.2",
|
||||
Description="test2",
|
||||
Attributes={"Marco": "organizativo"},
|
||||
Checks=["check_b"],
|
||||
),
|
||||
]
|
||||
tc = TableConfig(
|
||||
GroupBy="Marco",
|
||||
Labels=TableLabels(
|
||||
PassLabel="CUMPLE",
|
||||
FailLabel="NO CUMPLE",
|
||||
ProviderHeader="Proveedor",
|
||||
Title="Estado de Cumplimiento",
|
||||
),
|
||||
)
|
||||
fw = _make_framework(reqs, tc)
|
||||
|
||||
findings = [_make_finding("check_a", "PASS"), _make_finding("check_b", "FAIL")]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"test_fw",
|
||||
"output",
|
||||
"/tmp",
|
||||
False,
|
||||
framework=fw,
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "CUMPLE" in captured.out
|
||||
assert "Estado de Cumplimiento" in captured.out
|
||||
|
||||
|
||||
class TestMultiProviderDictChecks:
|
||||
def test_only_aws_checks_matched(self, capsys):
|
||||
"""With dict Checks and provider='aws', only AWS checks match findings."""
|
||||
reqs = [
|
||||
UniversalComplianceRequirement(
|
||||
Id="1.1",
|
||||
Description="test",
|
||||
Attributes={"Section": "IAM"},
|
||||
Checks={"aws": ["check_a"], "azure": ["check_b"]},
|
||||
),
|
||||
UniversalComplianceRequirement(
|
||||
Id="2.1",
|
||||
Description="test2",
|
||||
Attributes={"Section": "Logging"},
|
||||
Checks={"aws": ["check_c"], "gcp": ["check_d"]},
|
||||
),
|
||||
]
|
||||
tc = TableConfig(GroupBy="Section")
|
||||
fw = ComplianceFramework(
|
||||
Framework="MultiCloud",
|
||||
Name="Multi",
|
||||
Description="Test",
|
||||
Requirements=reqs,
|
||||
Outputs=OutputsConfig(Table_Config=tc),
|
||||
)
|
||||
|
||||
findings = [
|
||||
_make_finding("check_a", "PASS"),
|
||||
_make_finding("check_b", "FAIL"), # Azure check, should be ignored
|
||||
_make_finding("check_c", "PASS"),
|
||||
]
|
||||
bulk_metadata = {
|
||||
"check_a": MagicMock(Compliance=[]),
|
||||
"check_b": MagicMock(Compliance=[]),
|
||||
"check_c": MagicMock(Compliance=[]),
|
||||
}
|
||||
|
||||
get_universal_table(
|
||||
findings,
|
||||
bulk_metadata,
|
||||
"multi_cloud",
|
||||
"output",
|
||||
"/tmp",
|
||||
False,
|
||||
framework=fw,
|
||||
provider="aws",
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "IAM" in captured.out
|
||||
assert "Logging" in captured.out
|
||||
# check_b (azure) should not have been counted as FAIL for AWS
|
||||
assert "PASS" in captured.out
|
||||
|
||||
|
||||
class TestNoTableConfig:
|
||||
def test_returns_early_without_table_config(self, capsys):
|
||||
fw = ComplianceFramework(
|
||||
Framework="TestFW",
|
||||
Name="Test",
|
||||
Provider="AWS",
|
||||
Description="Test",
|
||||
Requirements=[],
|
||||
)
|
||||
get_universal_table([], {}, "test", "out", "/tmp", False, framework=fw)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
|
||||
def test_returns_early_without_framework(self, capsys):
|
||||
get_universal_table([], {}, "test", "out", "/tmp", False, framework=None)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
Reference in New Issue
Block a user