Compare commits

...

1 Commits

Author SHA1 Message Date
pedrooot
0766a4e1dd feat(sdk): add universal compliance schema models and loaders 2026-03-10 17:37:06 +01:00
2 changed files with 1532 additions and 1 deletions

View File

@@ -1,9 +1,10 @@
import json
import os
import sys
from enum import Enum
from typing import Optional, Union
from pydantic.v1 import BaseModel, ValidationError, root_validator
from pydantic.v1 import BaseModel, Field, ValidationError, root_validator
from prowler.lib.check.utils import list_compliance_modules
from prowler.lib.logger import logger
@@ -429,3 +430,491 @@ def load_compliance_framework(
sys.exit(1)
else:
return compliance_framework
# ─── Universal Compliance Schema Models (Phase 1-3) ─────────────────────────
class AttributeMetadata(BaseModel):
"""Schema descriptor for a single attribute field in a universal compliance framework."""
Key: str
Label: Optional[str] = None
Type: str = "str" # str, int, float, list_str, list_dict, bool
Enum: Optional[list] = None
CSV: bool = True
OCSF: bool = True
Required: bool = False
EnumDisplay: Optional[dict] = None # enum_value -> EnumValueDisplay dict
EnumOrder: Optional[list] = None # explicit ordering of enum values
ChartLabel: Optional[str] = None # axis label when used in charts
class SplitByConfig(BaseModel):
"""Column-splitting configuration (e.g. CIS Level 1/Level 2)."""
Field: str
Values: list
class ScoringConfig(BaseModel):
"""Weighted scoring configuration (e.g. ThreatScore)."""
RiskField: str
WeightField: str
class TableLabels(BaseModel):
"""Custom pass/fail labels for console table rendering."""
PassLabel: str = "PASS"
FailLabel: str = "FAIL"
ProviderHeader: str = "Provider"
GroupHeader: Optional[str] = None
StatusHeader: str = "Status"
Title: Optional[str] = None
ResultsTitle: Optional[str] = None
FooterNote: Optional[str] = None
class TableConfig(BaseModel):
"""Declarative rendering instructions for the console compliance table."""
GroupBy: str
SplitBy: Optional[SplitByConfig] = None
Scoring: Optional[ScoringConfig] = None
Labels: Optional[TableLabels] = None
class EnumValueDisplay(BaseModel):
"""Per-enum-value visual metadata for PDF rendering.
Replaces hardcoded DIMENSION_MAPPING, TIPO_ICONS, nivel colors.
"""
Label: Optional[str] = None # "Trazabilidad"
Abbreviation: Optional[str] = None # "T"
Color: Optional[str] = None # "#4286F4"
Icon: Optional[str] = None # emoji
class ChartConfig(BaseModel):
"""Declarative chart description for PDF reports."""
Id: str
Type: str # vertical_bar | horizontal_bar | radar
GroupBy: str # attribute key to group by
Title: Optional[str] = None
XLabel: Optional[str] = None
YLabel: Optional[str] = None
ValueSource: str = "compliance_percent"
ColorMode: str = "by_value" # by_value | fixed | by_group
FixedColor: Optional[str] = None
class ScoringFormula(BaseModel):
"""Weighted scoring formula (e.g. ThreatScore)."""
RiskField: str # "LevelOfRisk"
WeightField: str # "Weight"
RiskBoostFactor: float = 0.25 # rfac = 1 + factor * risk_level
class CriticalRequirementsFilter(BaseModel):
"""Filter for critical requirements section in PDF reports."""
FilterField: str # "LevelOfRisk"
MinValue: Optional[int] = None # 4 (int-based filter)
FilterValue: Optional[str] = None # "alto" (string-based filter)
StatusFilter: str = "FAIL"
Title: Optional[str] = None # "Critical Failed Requirements"
class ReportFilter(BaseModel):
"""Default report filtering for PDF generation."""
OnlyFailed: bool = True
IncludeManual: bool = False
class I18nLabels(BaseModel):
"""Localized labels for PDF report rendering."""
ReportTitle: Optional[str] = None
PageLabel: str = "Page"
PoweredBy: str = "Powered by Prowler"
FrameworkLabel: str = "Framework:"
VersionLabel: str = "Version:"
ProviderLabel: str = "Provider:"
DescriptionLabel: str = "Description:"
ComplianceScoreLabel: str = "Compliance Score by Sections"
RequirementsIndexLabel: str = "Requirements Index"
DetailedFindingsLabel: str = "Detailed Findings"
class PDFConfig(BaseModel):
"""Declarative PDF report configuration.
Drives the API report generator from JSON data instead of hardcoded
Python config. Colors are hex strings (e.g. '#336699').
"""
Language: str = "en"
LogoFilename: Optional[str] = None
PrimaryColor: Optional[str] = None
SecondaryColor: Optional[str] = None
BgColor: Optional[str] = None
Sections: Optional[list] = None
SectionShortNames: Optional[dict] = None
GroupByField: Optional[str] = None
SubGroupByField: Optional[str] = None
SectionTitles: Optional[dict] = None
Charts: Optional[list] = None
Scoring: Optional[ScoringFormula] = None
CriticalFilter: Optional[CriticalRequirementsFilter] = None
Filter: Optional[ReportFilter] = None
Labels: Optional[I18nLabels] = None
class UniversalComplianceRequirement(BaseModel):
"""Universal requirement with flat dict-based attributes."""
Id: str
Description: str
Name: Optional[str] = None
Attributes: dict = Field(default_factory=dict)
Checks: Union[list, dict] = Field(default_factory=list)
Tactics: Optional[list] = None
SubTechniques: Optional[list] = None
Platforms: Optional[list] = None
TechniqueURL: Optional[str] = None
class OutputsConfig(BaseModel):
"""Container for output-related configuration (table, PDF, etc.)."""
class Config:
allow_population_by_field_name = True
Table_Config: Optional[TableConfig] = Field(None, alias="TableConfig")
PDF_Config: Optional[PDFConfig] = Field(None, alias="PDFConfig")
class ComplianceFramework(BaseModel):
"""Universal top-level container for any compliance framework.
Provider may be explicit (single-provider JSON) or derived from Checks
keys when Checks is a dict keyed by provider.
"""
Framework: str
Name: str
Provider: Optional[str] = None
Version: Optional[str] = None
Description: str
Icon: Optional[str] = None
Requirements: list[UniversalComplianceRequirement]
AttributesMetadata: Optional[list[AttributeMetadata]] = None
Outputs: Optional[OutputsConfig] = None
@root_validator(pre=True)
# noqa: F841 - since vulture raises unused variable 'cls'
def migrate_legacy_output_fields(cls, values): # noqa: F841
"""Move top-level TableConfig/PDFConfig into Outputs for backward compat."""
tc = values.pop("TableConfig", None)
pc = values.pop("PDFConfig", None)
if tc is not None or pc is not None:
outputs = values.get("Outputs") or {}
if isinstance(outputs, OutputsConfig):
outputs = outputs.dict()
if tc is not None and "TableConfig" not in outputs:
outputs["TableConfig"] = tc
if pc is not None and "PDFConfig" not in outputs:
outputs["PDFConfig"] = pc
values["Outputs"] = outputs
return values
@root_validator
# noqa: F841 - since vulture raises unused variable 'cls'
def validate_attributes_against_metadata(cls, values): # noqa: F841
"""Validate every Requirement's Attributes dict against AttributesMetadata.
Checks:
- Required keys (Required=True) must be present in each Requirement.
- Enum-constrained keys must have a value within the declared Enum list.
- Basic type validation (int, float, bool) for non-None values.
"""
metadata = values.get("AttributesMetadata")
requirements = values.get("Requirements", [])
if not metadata:
return values
required_keys = {m.Key for m in metadata if m.Required}
valid_keys = {m.Key for m in metadata}
enum_map = {m.Key: m.Enum for m in metadata if m.Enum}
type_map = {m.Key: m.Type for m in metadata}
type_checks = {
"int": int,
"float": (int, float),
"bool": bool,
}
errors = []
for req in requirements:
attrs = req.Attributes
# Required keys
for key in required_keys:
if key not in attrs or attrs[key] is None:
errors.append(
f"Requirement '{req.Id}': missing required attribute '{key}'"
)
# Enum validation
for key, allowed in enum_map.items():
if key in attrs and attrs[key] is not None:
if attrs[key] not in allowed:
errors.append(
f"Requirement '{req.Id}': attribute '{key}' value "
f"'{attrs[key]}' not in {allowed}"
)
# Type validation for non-string types
for key in attrs:
if key not in valid_keys or attrs[key] is None:
continue
expected_type = type_map.get(key, "str")
py_type = type_checks.get(expected_type)
if py_type and not isinstance(attrs[key], py_type):
errors.append(
f"Requirement '{req.Id}': attribute '{key}' expected "
f"type {expected_type}, got {type(attrs[key]).__name__}"
)
if errors:
detail = "\n ".join(errors)
raise ValueError(f"AttributesMetadata validation failed:\n {detail}")
return values
def get_providers(self) -> list:
"""Derive the set of providers this framework supports.
Inspects Checks keys across all requirements. Falls back to the
explicit Provider field for single-provider frameworks.
"""
providers = set()
for req in self.Requirements:
if isinstance(req.Checks, dict):
providers.update(k.lower() for k in req.Checks.keys())
if self.Provider and not providers:
providers.add(self.Provider.lower())
return sorted(providers)
def supports_provider(self, provider: str) -> bool:
"""Return True if this framework has checks for the given provider."""
provider_lower = provider.lower()
for req in self.Requirements:
if isinstance(req.Checks, dict):
if provider_lower in (k.lower() for k in req.Checks.keys()):
return True
elif isinstance(req.Checks, list) and req.Checks:
# List-style checks: rely on explicit Provider field
if self.Provider and self.Provider.lower() == provider_lower:
return True
return False
# ─── Legacy-to-Universal Adapter (Phase 2) ──────────────────────────────────
def _infer_attribute_metadata(legacy: Compliance) -> Optional[list[AttributeMetadata]]:
"""Introspect the first requirement's attribute model to build AttributesMetadata."""
try:
if not legacy.Requirements:
return None
first_req = legacy.Requirements[0]
# MITRE requirements have Tactics at top level, not in Attributes
if isinstance(first_req, Mitre_Requirement):
return None
if not first_req.Attributes:
return None
sample_attr = first_req.Attributes[0]
metadata = []
for field_name, field_obj in sample_attr.__fields__.items():
field_type = field_obj.outer_type_
type_str = "str"
enum_values = None
origin = getattr(field_type, "__origin__", None)
if field_type is int:
type_str = "int"
elif field_type is float:
type_str = "float"
elif field_type is bool:
type_str = "bool"
elif origin is list:
args = getattr(field_type, "__args__", ())
if args and args[0] is dict:
type_str = "list_dict"
else:
type_str = "list_str"
elif isinstance(field_type, type) and issubclass(field_type, Enum):
type_str = "str"
enum_values = [e.value for e in field_type]
metadata.append(
AttributeMetadata(
Key=field_name,
Type=type_str,
Enum=enum_values,
Required=field_obj.required,
)
)
return metadata
except Exception:
return None
def adapt_legacy_to_universal(legacy: Compliance) -> ComplianceFramework:
"""Convert a legacy Compliance object to a ComplianceFramework."""
universal_requirements = []
for req in legacy.Requirements:
if isinstance(req, Mitre_Requirement):
# For MITRE, promote special fields and store raw attributes
raw_attrs = [attr.dict() for attr in req.Attributes]
attrs = {"_raw_attributes": raw_attrs}
universal_requirements.append(
UniversalComplianceRequirement(
Id=req.Id,
Description=req.Description,
Name=req.Name,
Attributes=attrs,
Checks=req.Checks,
Tactics=req.Tactics,
SubTechniques=req.SubTechniques,
Platforms=req.Platforms,
TechniqueURL=req.TechniqueURL,
)
)
else:
# Standard requirement: flatten first attribute to dict
if req.Attributes:
attrs = req.Attributes[0].dict()
else:
attrs = {}
universal_requirements.append(
UniversalComplianceRequirement(
Id=req.Id,
Description=req.Description,
Name=req.Name,
Attributes=attrs,
Checks=req.Checks,
)
)
inferred_metadata = _infer_attribute_metadata(legacy)
return ComplianceFramework(
Framework=legacy.Framework,
Name=legacy.Name,
Provider=legacy.Provider,
Version=legacy.Version,
Description=legacy.Description,
Requirements=universal_requirements,
AttributesMetadata=inferred_metadata,
)
def load_compliance_framework_universal(path: str) -> ComplianceFramework:
"""Load a compliance JSON as a ComplianceFramework, handling both new and legacy formats."""
try:
with open(path, "r") as f:
data = json.load(f)
if "AttributesMetadata" in data:
# New universal format — parse directly
return ComplianceFramework(**data)
else:
# Legacy format — parse as Compliance, then adapt
legacy = Compliance(**data)
return adapt_legacy_to_universal(legacy)
except Exception as e:
logger.error(
f"Failed to load universal compliance framework from {path}: "
f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}"
)
return None
def _load_jsons_from_dir(dir_path: str, provider: str, bulk: dict) -> None:
"""Scan *dir_path* for JSON files and add matching frameworks to *bulk*."""
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
if not (
os.path.isfile(file_path)
and filename.endswith(".json")
and os.stat(file_path).st_size > 0
):
continue
framework_name = filename.split(".json")[0]
if framework_name in bulk:
continue
fw = load_compliance_framework_universal(file_path)
if fw is None:
continue
if fw.Provider and fw.Provider.lower() == provider.lower():
bulk[framework_name] = fw
elif fw.supports_provider(provider):
bulk[framework_name] = fw
def get_bulk_compliance_frameworks_universal(provider: str) -> dict:
"""Bulk load all compliance frameworks relevant to the given provider.
Scans:
1. The **top-level** ``prowler/compliance/`` directory for multi-provider
JSONs (``Checks`` keyed by provider, no ``Provider`` field).
2. Every **provider sub-directory** (``prowler/compliance/{p}/``) so that
single-provider JSONs are also picked up.
A framework is included when its explicit ``Provider`` matches
(case-insensitive) **or** any requirement has dict-style ``Checks``
with a key for *provider*.
"""
bulk = {}
try:
available_modules = list_compliance_modules()
# Resolve the compliance root once (parent of provider sub-dirs).
compliance_root = None
seen_paths = set()
for module in available_modules:
dir_path = f"{module.module_finder.path}/{module.name.split('.')[-1]}"
if not os.path.isdir(dir_path) or dir_path in seen_paths:
continue
seen_paths.add(dir_path)
# Remember the root the first time we see a valid sub-dir.
if compliance_root is None:
compliance_root = module.module_finder.path
_load_jsons_from_dir(dir_path, provider, bulk)
# Also scan top-level compliance/ for provider-agnostic JSONs.
if compliance_root and os.path.isdir(compliance_root):
_load_jsons_from_dir(compliance_root, provider, bulk)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
return bulk

File diff suppressed because it is too large Load Diff