Compare commits

...

12 Commits

Author SHA1 Message Date
pedrooot
7dc364c55d feat(api): fix tests 2025-12-18 13:24:32 +01:00
pedrooot
3b910c9c51 feat(api): update with latests changes and add tests 2025-12-18 13:09:27 +01:00
pedrooot
e06d85ff73 Merge branch 'master' into test-reporting-improvements 2025-12-18 12:39:31 +01:00
pedrooot
8cde5a1636 chore(revision): resolve comments 2025-12-18 12:33:24 +01:00
pedrooot
489454b5c6 feat(api): remove unneeded changes 2025-12-16 17:09:44 +01:00
pedrooot
0bd691839b chore(changelog): update with latest changes 2025-12-16 17:03:42 +01:00
pedrooot
9fc7ec8157 feat(api): add reporting tests 2025-12-16 13:35:46 +01:00
pedrooot
dd927f0caf chore: merge master and resolve conflicts in report.py 2025-12-16 12:47:53 +01:00
pedrooot
408156abab feat(report): add optimizations for PDF reporting 2025-12-16 12:40:23 +01:00
pedrooot
ae0ba7db1e feat(reporting): add visual improvements 2025-12-16 09:52:54 +01:00
pedrooot
a7daca2caf feat(report): add needed changes in report.py 2025-12-03 19:03:59 +01:00
pedrooot
925f4b03c1 feat(report): improve the way of reporting and adding reports 2025-12-03 18:48:22 +01:00
18 changed files with 9486 additions and 5205 deletions

View File

@@ -8,6 +8,7 @@ All notable changes to the **Prowler API** are documented in this file.
- New endpoint to retrieve and overview of the categories based on finding severities [(#9529)](https://github.com/prowler-cloud/prowler/pull/9529)
- Endpoints `GET /findings` and `GET /findings/latests` can now use the category filter [(#9529)](https://github.com/prowler-cloud/prowler/pull/9529)
- Account id, alias and provider name to PDF reporting table [(#9574)](https://github.com/prowler-cloud/prowler/pull/9574)
- Added memory optimizations for large compliance report generation [(#9444)](https://github.com/prowler-cloud/prowler/pull/9444)
### Changed
- Endpoint `GET /overviews/attack-surfaces` no longer returns the related check IDs [(#9529)](https://github.com/prowler-cloud/prowler/pull/9529)

View File

@@ -276,7 +276,7 @@ FINDINGS_MAX_DAYS_IN_RANGE = env.int("DJANGO_FINDINGS_MAX_DAYS_IN_RANGE", 7)
DJANGO_TMP_OUTPUT_DIRECTORY = env.str(
"DJANGO_TMP_OUTPUT_DIRECTORY", "/tmp/prowler_api_output"
)
DJANGO_FINDINGS_BATCH_SIZE = env.str("DJANGO_FINDINGS_BATCH_SIZE", 1000)
DJANGO_FINDINGS_BATCH_SIZE = env.int("DJANGO_FINDINGS_BATCH_SIZE", 1000)
DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET = env.str("DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET", "")
DJANGO_OUTPUT_S3_AWS_ACCESS_KEY_ID = env.str("DJANGO_OUTPUT_S3_AWS_ACCESS_KEY_ID", "")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,186 @@
# Base classes and data structures
from .base import (
BaseComplianceReportGenerator,
ComplianceData,
RequirementData,
create_pdf_styles,
get_requirement_metadata,
)
# Chart functions
from .charts import (
create_horizontal_bar_chart,
create_pie_chart,
create_radar_chart,
create_stacked_bar_chart,
create_vertical_bar_chart,
get_chart_color_for_percentage,
)
# Reusable components
# Reusable components: Color helpers, Badge components, Risk component,
# Table components, Section components
from .components import (
ColumnConfig,
create_badge,
create_data_table,
create_findings_table,
create_info_table,
create_multi_badge_row,
create_risk_component,
create_section_header,
create_status_badge,
create_summary_table,
get_color_for_compliance,
get_color_for_risk_level,
get_color_for_weight,
get_status_color,
)
# Framework configuration: Main configuration, Color constants, ENS colors,
# NIS2 colors, Chart colors, ENS constants, Section constants, Layout constants
from .config import (
CHART_COLOR_BLUE,
CHART_COLOR_GREEN_1,
CHART_COLOR_GREEN_2,
CHART_COLOR_ORANGE,
CHART_COLOR_RED,
CHART_COLOR_YELLOW,
COL_WIDTH_LARGE,
COL_WIDTH_MEDIUM,
COL_WIDTH_SMALL,
COL_WIDTH_XLARGE,
COL_WIDTH_XXLARGE,
COLOR_BG_BLUE,
COLOR_BG_LIGHT_BLUE,
COLOR_BLUE,
COLOR_DARK_GRAY,
COLOR_ENS_ALTO,
COLOR_ENS_BAJO,
COLOR_ENS_MEDIO,
COLOR_ENS_OPCIONAL,
COLOR_GRAY,
COLOR_HIGH_RISK,
COLOR_LIGHT_BLUE,
COLOR_LIGHT_GRAY,
COLOR_LIGHTER_BLUE,
COLOR_LOW_RISK,
COLOR_MEDIUM_RISK,
COLOR_NIS2_PRIMARY,
COLOR_NIS2_SECONDARY,
COLOR_PROWLER_DARK_GREEN,
COLOR_SAFE,
COLOR_WHITE,
DIMENSION_KEYS,
DIMENSION_MAPPING,
DIMENSION_NAMES,
ENS_NIVEL_ORDER,
ENS_TIPO_ORDER,
FRAMEWORK_REGISTRY,
NIS2_SECTION_TITLES,
NIS2_SECTIONS,
PADDING_LARGE,
PADDING_MEDIUM,
PADDING_SMALL,
PADDING_XLARGE,
THREATSCORE_SECTIONS,
TIPO_ICONS,
FrameworkConfig,
get_framework_config,
)
# Framework-specific generators
from .ens import ENSReportGenerator
from .nis2 import NIS2ReportGenerator
from .threatscore import ThreatScoreReportGenerator
__all__ = [
# Base classes
"BaseComplianceReportGenerator",
"ComplianceData",
"RequirementData",
"create_pdf_styles",
"get_requirement_metadata",
# Framework-specific generators
"ThreatScoreReportGenerator",
"ENSReportGenerator",
"NIS2ReportGenerator",
# Configuration
"FrameworkConfig",
"FRAMEWORK_REGISTRY",
"get_framework_config",
# Color constants
"COLOR_BLUE",
"COLOR_LIGHT_BLUE",
"COLOR_LIGHTER_BLUE",
"COLOR_BG_BLUE",
"COLOR_BG_LIGHT_BLUE",
"COLOR_GRAY",
"COLOR_LIGHT_GRAY",
"COLOR_DARK_GRAY",
"COLOR_WHITE",
"COLOR_HIGH_RISK",
"COLOR_MEDIUM_RISK",
"COLOR_LOW_RISK",
"COLOR_SAFE",
"COLOR_PROWLER_DARK_GREEN",
"COLOR_ENS_ALTO",
"COLOR_ENS_MEDIO",
"COLOR_ENS_BAJO",
"COLOR_ENS_OPCIONAL",
"COLOR_NIS2_PRIMARY",
"COLOR_NIS2_SECONDARY",
"CHART_COLOR_BLUE",
"CHART_COLOR_GREEN_1",
"CHART_COLOR_GREEN_2",
"CHART_COLOR_YELLOW",
"CHART_COLOR_ORANGE",
"CHART_COLOR_RED",
# ENS constants
"DIMENSION_MAPPING",
"DIMENSION_NAMES",
"DIMENSION_KEYS",
"ENS_NIVEL_ORDER",
"ENS_TIPO_ORDER",
"TIPO_ICONS",
# Section constants
"THREATSCORE_SECTIONS",
"NIS2_SECTIONS",
"NIS2_SECTION_TITLES",
# Layout constants
"COL_WIDTH_SMALL",
"COL_WIDTH_MEDIUM",
"COL_WIDTH_LARGE",
"COL_WIDTH_XLARGE",
"COL_WIDTH_XXLARGE",
"PADDING_SMALL",
"PADDING_MEDIUM",
"PADDING_LARGE",
"PADDING_XLARGE",
# Color helpers
"get_color_for_risk_level",
"get_color_for_weight",
"get_color_for_compliance",
"get_status_color",
# Badge components
"create_badge",
"create_status_badge",
"create_multi_badge_row",
# Risk component
"create_risk_component",
# Table components
"create_info_table",
"create_data_table",
"create_findings_table",
"ColumnConfig",
# Section components
"create_section_header",
"create_summary_table",
# Chart functions
"get_chart_color_for_percentage",
"create_vertical_bar_chart",
"create_horizontal_bar_chart",
"create_radar_chart",
"create_pie_chart",
"create_stacked_bar_chart",
]

View File

@@ -0,0 +1,892 @@
import gc
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
from celery.utils.log import get_task_logger
from reportlab.lib.enums import TA_CENTER
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import inch
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfgen import canvas
from reportlab.platypus import Image, PageBreak, Paragraph, SimpleDocTemplate, Spacer
from tasks.jobs.threatscore_utils import (
_aggregate_requirement_statistics_from_database,
_calculate_requirements_data_from_statistics,
_load_findings_for_requirement_checks,
)
from api.db_router import READ_REPLICA_ALIAS
from api.db_utils import rls_transaction
from api.models import Provider, StatusChoices
from api.utils import initialize_prowler_provider
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.finding import Finding as FindingOutput
from .components import (
ColumnConfig,
create_data_table,
create_info_table,
create_status_badge,
)
from .config import (
COLOR_BG_BLUE,
COLOR_BG_LIGHT_BLUE,
COLOR_BLUE,
COLOR_BORDER_GRAY,
COLOR_GRAY,
COLOR_LIGHT_BLUE,
COLOR_LIGHTER_BLUE,
COLOR_PROWLER_DARK_GREEN,
PADDING_LARGE,
PADDING_SMALL,
FrameworkConfig,
)
logger = get_task_logger(__name__)
# Register fonts (done once at module load)
_fonts_registered: bool = False
def _register_fonts() -> None:
"""Register custom fonts for PDF generation.
Uses a module-level flag to ensure fonts are only registered once,
avoiding duplicate registration errors from reportlab.
"""
global _fonts_registered
if _fonts_registered:
return
fonts_dir = os.path.join(os.path.dirname(__file__), "../../assets/fonts")
pdfmetrics.registerFont(
TTFont(
"PlusJakartaSans",
os.path.join(fonts_dir, "PlusJakartaSans-Regular.ttf"),
)
)
pdfmetrics.registerFont(
TTFont(
"FiraCode",
os.path.join(fonts_dir, "FiraCode-Regular.ttf"),
)
)
_fonts_registered = True
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class RequirementData:
"""Data for a single compliance requirement.
Attributes:
id: Requirement identifier
description: Requirement description
status: Compliance status (PASS, FAIL, MANUAL)
passed_findings: Number of passed findings
failed_findings: Number of failed findings
total_findings: Total number of findings
checks: List of check IDs associated with this requirement
attributes: Framework-specific requirement attributes
"""
id: str
description: str
status: str
passed_findings: int = 0
failed_findings: int = 0
total_findings: int = 0
checks: list[str] = field(default_factory=list)
attributes: Any = None
@dataclass
class ComplianceData:
"""Aggregated compliance data for report generation.
This dataclass holds all the data needed to generate a compliance report,
including compliance framework metadata, requirements, and findings.
Attributes:
tenant_id: Tenant identifier
scan_id: Scan identifier
provider_id: Provider identifier
compliance_id: Compliance framework identifier
framework: Framework name (e.g., "CIS", "ENS")
name: Full compliance framework name
version: Framework version
description: Framework description
requirements: List of RequirementData objects
attributes_by_requirement_id: Mapping of requirement IDs to their attributes
findings_by_check_id: Mapping of check IDs to their findings
provider_obj: Provider model object
prowler_provider: Initialized Prowler provider
"""
tenant_id: str
scan_id: str
provider_id: str
compliance_id: str
framework: str
name: str
version: str
description: str
requirements: list[RequirementData] = field(default_factory=list)
attributes_by_requirement_id: dict[str, dict] = field(default_factory=dict)
findings_by_check_id: dict[str, list[FindingOutput]] = field(default_factory=dict)
provider_obj: Provider | None = None
prowler_provider: Any = None
def get_requirement_metadata(
requirement_id: str,
attributes_by_requirement_id: dict[str, dict],
) -> Any | None:
"""Get the first requirement metadata object from attributes.
This helper function extracts the requirement metadata (req_attributes)
from the attributes dictionary. It's a common pattern used across all
report generators.
Args:
requirement_id: The requirement ID to look up.
attributes_by_requirement_id: Mapping of requirement IDs to their attributes.
Returns:
The first requirement attribute object, or None if not found.
Example:
>>> meta = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
>>> if meta:
... section = getattr(meta, "Section", "Unknown")
"""
req_attrs = attributes_by_requirement_id.get(requirement_id, {})
meta_list = req_attrs.get("attributes", {}).get("req_attributes", [])
if meta_list:
return meta_list[0]
return None
# =============================================================================
# PDF Styles Cache
# =============================================================================
_PDF_STYLES_CACHE: dict[str, ParagraphStyle] | None = None
def create_pdf_styles() -> dict[str, ParagraphStyle]:
"""Create and return PDF paragraph styles used throughout the report.
Styles are cached on first call to improve performance.
Returns:
Dictionary containing the following styles:
- 'title': Title style with prowler green color
- 'h1': Heading 1 style with blue color and background
- 'h2': Heading 2 style with light blue color
- 'h3': Heading 3 style for sub-headings
- 'normal': Normal text style with left indent
- 'normal_center': Normal text style without indent
"""
global _PDF_STYLES_CACHE
if _PDF_STYLES_CACHE is not None:
return _PDF_STYLES_CACHE
_register_fonts()
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
"CustomTitle",
parent=styles["Title"],
fontSize=24,
textColor=COLOR_PROWLER_DARK_GREEN,
spaceAfter=20,
fontName="PlusJakartaSans",
alignment=TA_CENTER,
)
h1 = ParagraphStyle(
"CustomH1",
parent=styles["Heading1"],
fontSize=18,
textColor=COLOR_BLUE,
spaceBefore=20,
spaceAfter=12,
fontName="PlusJakartaSans",
leftIndent=0,
borderWidth=2,
borderColor=COLOR_BLUE,
borderPadding=PADDING_LARGE,
backColor=COLOR_BG_BLUE,
)
h2 = ParagraphStyle(
"CustomH2",
parent=styles["Heading2"],
fontSize=14,
textColor=COLOR_LIGHT_BLUE,
spaceBefore=15,
spaceAfter=8,
fontName="PlusJakartaSans",
leftIndent=10,
borderWidth=1,
borderColor=COLOR_BORDER_GRAY,
borderPadding=5,
backColor=COLOR_BG_LIGHT_BLUE,
)
h3 = ParagraphStyle(
"CustomH3",
parent=styles["Heading3"],
fontSize=12,
textColor=COLOR_LIGHTER_BLUE,
spaceBefore=10,
spaceAfter=6,
fontName="PlusJakartaSans",
leftIndent=20,
)
normal = ParagraphStyle(
"CustomNormal",
parent=styles["Normal"],
fontSize=10,
textColor=COLOR_GRAY,
spaceBefore=PADDING_SMALL,
spaceAfter=PADDING_SMALL,
leftIndent=30,
fontName="PlusJakartaSans",
)
normal_center = ParagraphStyle(
"CustomNormalCenter",
parent=styles["Normal"],
fontSize=10,
textColor=COLOR_GRAY,
fontName="PlusJakartaSans",
)
_PDF_STYLES_CACHE = {
"title": title_style,
"h1": h1,
"h2": h2,
"h3": h3,
"normal": normal,
"normal_center": normal_center,
}
return _PDF_STYLES_CACHE
# =============================================================================
# Base Report Generator
# =============================================================================
class BaseComplianceReportGenerator(ABC):
"""Abstract base class for compliance PDF report generators.
This class implements the Template Method pattern, providing a common
structure for all compliance reports while allowing subclasses to
customize specific sections.
Subclasses must implement:
- create_executive_summary()
- create_charts_section()
- create_requirements_index()
Optionally, subclasses can override:
- create_cover_page()
- create_detailed_findings()
- get_footer_text()
"""
def __init__(self, config: FrameworkConfig):
"""Initialize the report generator.
Args:
config: Framework configuration
"""
self.config = config
self.styles = create_pdf_styles()
# =========================================================================
# Template Method
# =========================================================================
def generate(
self,
tenant_id: str,
scan_id: str,
compliance_id: str,
output_path: str,
provider_id: str,
provider_obj: Provider | None = None,
requirement_statistics: dict[str, dict[str, int]] | None = None,
findings_cache: dict[str, list[FindingOutput]] | None = None,
**kwargs,
) -> None:
"""Generate the PDF compliance report.
This is the template method that orchestrates the report generation.
It calls abstract methods that subclasses must implement.
Args:
tenant_id: Tenant identifier for RLS context
scan_id: Scan identifier
compliance_id: Compliance framework identifier
output_path: Path where the PDF will be saved
provider_id: Provider identifier
provider_obj: Optional pre-fetched Provider object
requirement_statistics: Optional pre-aggregated statistics
findings_cache: Optional pre-loaded findings cache
**kwargs: Additional framework-specific arguments
"""
logger.info(
"Generating %s report for scan %s", self.config.display_name, scan_id
)
try:
# 1. Load compliance data
data = self._load_compliance_data(
tenant_id=tenant_id,
scan_id=scan_id,
compliance_id=compliance_id,
provider_id=provider_id,
provider_obj=provider_obj,
requirement_statistics=requirement_statistics,
findings_cache=findings_cache,
)
# 2. Create PDF document
doc = self._create_document(output_path, data)
# 3. Build report elements incrementally to manage memory
# We collect garbage after heavy sections to prevent OOM on large reports
elements = []
# Cover page (lightweight)
elements.extend(self.create_cover_page(data))
elements.append(PageBreak())
# Executive summary (framework-specific)
elements.extend(self.create_executive_summary(data))
# Charts section (framework-specific) - heavy on memory due to matplotlib
elements.extend(self.create_charts_section(data))
elements.append(PageBreak())
gc.collect() # Free matplotlib resources
# Requirements index (framework-specific)
elements.extend(self.create_requirements_index(data))
elements.append(PageBreak())
# Detailed findings - heaviest section, loads findings on-demand
logger.info("Building detailed findings section...")
elements.extend(self.create_detailed_findings(data, **kwargs))
gc.collect() # Free findings data after processing
# 4. Build the PDF
logger.info("Building PDF document with %d elements...", len(elements))
self._build_pdf(doc, elements, data)
# Final cleanup
del elements
gc.collect()
logger.info("Successfully generated report at %s", output_path)
except Exception as e:
import traceback
tb_lineno = e.__traceback__.tb_lineno if e.__traceback__ else "unknown"
logger.error("Error generating report, line %s -- %s", tb_lineno, e)
logger.error("Full traceback:\n%s", traceback.format_exc())
raise
# =========================================================================
# Abstract Methods (must be implemented by subclasses)
# =========================================================================
@abstractmethod
def create_executive_summary(self, data: ComplianceData) -> list:
"""Create the executive summary section.
This section typically includes:
- Overall compliance score/metrics
- High-level statistics
- Critical findings summary
Args:
data: Aggregated compliance data
Returns:
List of ReportLab elements
"""
@abstractmethod
def create_charts_section(self, data: ComplianceData) -> list:
"""Create the charts and visualizations section.
This section typically includes:
- Compliance score charts by section
- Distribution charts
- Trend visualizations
Args:
data: Aggregated compliance data
Returns:
List of ReportLab elements
"""
@abstractmethod
def create_requirements_index(self, data: ComplianceData) -> list:
"""Create the requirements index/table of contents.
This section typically includes:
- Hierarchical list of requirements
- Status indicators
- Section groupings
Args:
data: Aggregated compliance data
Returns:
List of ReportLab elements
"""
# =========================================================================
# Common Methods (can be overridden by subclasses)
# =========================================================================
def create_cover_page(self, data: ComplianceData) -> list:
"""Create the report cover page.
Args:
data: Aggregated compliance data
Returns:
List of ReportLab elements
"""
elements = []
# Prowler logo
logo_path = os.path.join(
os.path.dirname(__file__), "../../assets/img/prowler_logo.png"
)
if os.path.exists(logo_path):
logo = Image(logo_path, width=5 * inch, height=1 * inch)
elements.append(logo)
elements.append(Spacer(1, 0.5 * inch))
# Title
title_text = f"{self.config.display_name} Report"
elements.append(Paragraph(title_text, self.styles["title"]))
elements.append(Spacer(1, 0.5 * inch))
# Compliance info table
info_rows = self._build_info_rows(data, language=self.config.language)
info_table = create_info_table(
rows=info_rows,
label_width=2 * inch,
value_width=4 * inch,
normal_style=self.styles["normal_center"],
)
elements.append(info_table)
return elements
def _build_info_rows(
self, data: ComplianceData, language: str = "en"
) -> list[tuple[str, str]]:
"""Build the standard info rows for the cover page table.
This helper method creates the common metadata rows used in all
report cover pages. Subclasses can use this to maintain consistency
while customizing other aspects of the cover page.
Args:
data: Aggregated compliance data.
language: Language for labels ("en" or "es").
Returns:
List of (label, value) tuples for the info table.
"""
# Labels based on language
labels = {
"en": {
"framework": "Framework:",
"id": "ID:",
"name": "Name:",
"version": "Version:",
"provider": "Provider:",
"account_id": "Account ID:",
"alias": "Alias:",
"scan_id": "Scan ID:",
"description": "Description:",
},
"es": {
"framework": "Framework:",
"id": "ID:",
"name": "Nombre:",
"version": "Versión:",
"provider": "Proveedor:",
"account_id": "Account ID:",
"alias": "Alias:",
"scan_id": "Scan ID:",
"description": "Descripción:",
},
}
lang_labels = labels.get(language, labels["en"])
info_rows = [
(lang_labels["framework"], data.framework),
(lang_labels["id"], data.compliance_id),
(lang_labels["name"], data.name),
(lang_labels["version"], data.version),
]
# Add provider info if available
if data.provider_obj:
info_rows.append(
(lang_labels["provider"], data.provider_obj.provider.upper())
)
info_rows.append(
(lang_labels["account_id"], data.provider_obj.uid or "N/A")
)
info_rows.append((lang_labels["alias"], data.provider_obj.alias or "N/A"))
info_rows.append((lang_labels["scan_id"], data.scan_id))
if data.description:
info_rows.append((lang_labels["description"], data.description))
return info_rows
def create_detailed_findings(self, data: ComplianceData, **kwargs) -> list:
"""Create the detailed findings section.
This default implementation creates a requirement-by-requirement
breakdown with findings tables. Subclasses can override for
framework-specific presentation.
This method implements on-demand loading of findings using the shared
findings cache to minimize database queries and memory usage.
Args:
data: Aggregated compliance data
**kwargs: Framework-specific options (e.g., only_failed)
Returns:
List of ReportLab elements
"""
elements = []
only_failed = kwargs.get("only_failed", True)
include_manual = kwargs.get("include_manual", False)
# Filter requirements if needed
requirements = data.requirements
if only_failed:
# Include FAIL requirements, and optionally MANUAL if include_manual is True
if include_manual:
requirements = [
r
for r in requirements
if r.status in (StatusChoices.FAIL, StatusChoices.MANUAL)
]
else:
requirements = [
r for r in requirements if r.status == StatusChoices.FAIL
]
# Collect all check IDs for requirements that will be displayed
# This allows us to load only the findings we actually need (memory optimization)
check_ids_to_load = []
for req in requirements:
check_ids_to_load.extend(req.checks)
# Load findings on-demand only for the checks that will be displayed
# Uses the shared findings cache to avoid duplicate queries across reports
logger.info("Loading findings on-demand for %d requirements", len(requirements))
findings_by_check_id = _load_findings_for_requirement_checks(
data.tenant_id,
data.scan_id,
check_ids_to_load,
data.prowler_provider,
data.findings_by_check_id, # Pass the cache to update it
)
for req in requirements:
# Requirement header
elements.append(
Paragraph(
f"{req.id}: {req.description}",
self.styles["h1"],
)
)
# Status badge
elements.append(create_status_badge(req.status))
elements.append(Spacer(1, 0.1 * inch))
# Findings for this requirement
for check_id in req.checks:
elements.append(Paragraph(f"Check: {check_id}", self.styles["h2"]))
findings = findings_by_check_id.get(check_id, [])
if not findings:
elements.append(
Paragraph(
"- No information for this finding currently",
self.styles["normal"],
)
)
else:
# Create findings table
findings_table = self._create_findings_table(findings)
elements.append(findings_table)
elements.append(Spacer(1, 0.1 * inch))
elements.append(PageBreak())
return elements
def get_footer_text(self, page_num: int) -> tuple[str, str]:
"""Get footer text for a page.
Args:
page_num: Current page number
Returns:
Tuple of (left_text, right_text) for the footer
"""
if self.config.language == "es":
page_text = f"Página {page_num}"
else:
page_text = f"Page {page_num}"
return page_text, "Powered by Prowler"
# =========================================================================
# Private Helper Methods
# =========================================================================
def _load_compliance_data(
self,
tenant_id: str,
scan_id: str,
compliance_id: str,
provider_id: str,
provider_obj: Provider | None,
requirement_statistics: dict | None,
findings_cache: dict | None,
) -> ComplianceData:
"""Load and aggregate compliance data from the database.
Args:
tenant_id: Tenant identifier
scan_id: Scan identifier
compliance_id: Compliance framework identifier
provider_id: Provider identifier
provider_obj: Optional pre-fetched Provider
requirement_statistics: Optional pre-aggregated statistics
findings_cache: Optional pre-loaded findings
Returns:
Aggregated ComplianceData object
"""
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
# Load provider
if provider_obj is None:
provider_obj = Provider.objects.get(id=provider_id)
prowler_provider = initialize_prowler_provider(provider_obj)
provider_type = provider_obj.provider
# Load compliance framework
frameworks_bulk = Compliance.get_bulk(provider_type)
compliance_obj = frameworks_bulk.get(compliance_id)
if not compliance_obj:
raise ValueError(f"Compliance framework not found: {compliance_id}")
framework = getattr(compliance_obj, "Framework", "N/A")
name = getattr(compliance_obj, "Name", "N/A")
version = getattr(compliance_obj, "Version", "N/A")
description = getattr(compliance_obj, "Description", "")
# Aggregate requirement statistics
if requirement_statistics is None:
logger.info("Aggregating requirement statistics for scan %s", scan_id)
requirement_statistics = _aggregate_requirement_statistics_from_database(
tenant_id, scan_id
)
else:
logger.info("Reusing pre-aggregated statistics for scan %s", scan_id)
# Calculate requirements data
attributes_by_requirement_id, requirements_list = (
_calculate_requirements_data_from_statistics(
compliance_obj, requirement_statistics
)
)
# Convert to RequirementData objects
requirements = []
for req_dict in requirements_list:
req = RequirementData(
id=req_dict["id"],
description=req_dict["attributes"].get("description", ""),
status=req_dict["attributes"].get("status", StatusChoices.MANUAL),
passed_findings=req_dict["attributes"].get("passed_findings", 0),
failed_findings=req_dict["attributes"].get("failed_findings", 0),
total_findings=req_dict["attributes"].get("total_findings", 0),
checks=attributes_by_requirement_id.get(req_dict["id"], {})
.get("attributes", {})
.get("checks", []),
)
requirements.append(req)
return ComplianceData(
tenant_id=tenant_id,
scan_id=scan_id,
provider_id=provider_id,
compliance_id=compliance_id,
framework=framework,
name=name,
version=version,
description=description,
requirements=requirements,
attributes_by_requirement_id=attributes_by_requirement_id,
findings_by_check_id=findings_cache if findings_cache is not None else {},
provider_obj=provider_obj,
prowler_provider=prowler_provider,
)
def _create_document(
self, output_path: str, data: ComplianceData
) -> SimpleDocTemplate:
"""Create the PDF document template.
Args:
output_path: Path for the output PDF
data: Compliance data for metadata
Returns:
Configured SimpleDocTemplate
"""
return SimpleDocTemplate(
output_path,
pagesize=letter,
title=f"{self.config.display_name} Report - {data.framework}",
author="Prowler",
subject=f"Compliance Report for {data.framework}",
creator="Prowler Engineering Team",
keywords=f"compliance,{data.framework},security,framework,prowler",
)
def _build_pdf(
self,
doc: SimpleDocTemplate,
elements: list,
data: ComplianceData,
) -> None:
"""Build the final PDF with footers.
Args:
doc: Document template
elements: List of ReportLab elements
data: Compliance data
"""
def add_footer(
canvas_obj: canvas.Canvas,
doc_template: SimpleDocTemplate,
) -> None:
canvas_obj.saveState()
width, _ = doc_template.pagesize
left_text, right_text = self.get_footer_text(doc_template.page)
canvas_obj.setFont("PlusJakartaSans", 9)
canvas_obj.setFillColorRGB(0.4, 0.4, 0.4)
canvas_obj.drawString(30, 20, left_text)
text_width = canvas_obj.stringWidth(right_text, "PlusJakartaSans", 9)
canvas_obj.drawString(width - text_width - 30, 20, right_text)
canvas_obj.restoreState()
doc.build(
elements,
onFirstPage=add_footer,
onLaterPages=add_footer,
)
def _create_findings_table(self, findings: list[FindingOutput]) -> Any:
"""Create a findings table.
Args:
findings: List of finding objects
Returns:
ReportLab Table element
"""
def get_finding_title(f):
metadata = getattr(f, "metadata", None)
if metadata:
return getattr(metadata, "CheckTitle", getattr(f, "check_id", ""))
return getattr(f, "check_id", "")
def get_resource_name(f):
name = getattr(f, "resource_name", "")
if not name:
name = getattr(f, "resource_uid", "")
return name
def get_severity(f):
metadata = getattr(f, "metadata", None)
if metadata:
return getattr(metadata, "Severity", "").capitalize()
return ""
# Convert findings to dicts for the table
data = []
for f in findings:
item = {
"title": get_finding_title(f),
"resource_name": get_resource_name(f),
"severity": get_severity(f),
"status": getattr(f, "status", "").upper(),
"region": getattr(f, "region", "global"),
}
data.append(item)
columns = [
ColumnConfig("Finding", 2.5 * inch, "title"),
ColumnConfig("Resource", 3 * inch, "resource_name"),
ColumnConfig("Severity", 0.9 * inch, "severity"),
ColumnConfig("Status", 0.9 * inch, "status"),
ColumnConfig("Region", 0.9 * inch, "region"),
]
return create_data_table(
data=data,
columns=columns,
header_color=self.config.primary_color,
normal_style=self.styles["normal_center"],
)

View File

@@ -0,0 +1,404 @@
import gc
import io
import math
from typing import Callable
import matplotlib
# Use non-interactive Agg backend for memory efficiency in server environments
# This MUST be set before importing pyplot
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
from .config import ( # noqa: E402
CHART_COLOR_BLUE,
CHART_COLOR_GREEN_1,
CHART_COLOR_GREEN_2,
CHART_COLOR_ORANGE,
CHART_COLOR_RED,
CHART_COLOR_YELLOW,
CHART_DPI_DEFAULT,
)
# Use centralized DPI setting from config
DEFAULT_CHART_DPI = CHART_DPI_DEFAULT
def get_chart_color_for_percentage(percentage: float) -> str:
"""Get chart color string based on percentage.
Args:
percentage: Value between 0 and 100
Returns:
Hex color string for matplotlib
"""
if percentage >= 80:
return CHART_COLOR_GREEN_1
if percentage >= 60:
return CHART_COLOR_GREEN_2
if percentage >= 40:
return CHART_COLOR_YELLOW
if percentage >= 20:
return CHART_COLOR_ORANGE
return CHART_COLOR_RED
def create_vertical_bar_chart(
labels: list[str],
values: list[float],
ylabel: str = "Compliance Score (%)",
xlabel: str = "Section",
title: str | None = None,
color_func: Callable[[float], str] | None = None,
colors: list[str] | None = None,
figsize: tuple[int, int] = (10, 6),
dpi: int = DEFAULT_CHART_DPI,
y_limit: tuple[float, float] = (0, 100),
show_labels: bool = True,
rotation: int = 45,
) -> io.BytesIO:
"""Create a vertical bar chart.
Args:
labels: X-axis labels
values: Bar heights (numeric values)
ylabel: Y-axis label
xlabel: X-axis label
title: Optional chart title
color_func: Function to determine bar color based on value
colors: Explicit list of colors (overrides color_func)
figsize: Figure size (width, height) in inches
dpi: Resolution for output image
y_limit: Y-axis limits (min, max)
show_labels: Whether to show value labels on bars
rotation: X-axis label rotation angle
Returns:
BytesIO buffer containing the PNG image
"""
if color_func is None:
color_func = get_chart_color_for_percentage
fig, ax = plt.subplots(figsize=figsize)
# Determine colors
if colors is None:
colors_list = [color_func(v) for v in values]
else:
colors_list = colors
bars = ax.bar(labels, values, color=colors_list)
ax.set_ylabel(ylabel, fontsize=12)
ax.set_xlabel(xlabel, fontsize=12)
ax.set_ylim(*y_limit)
if title:
ax.set_title(title, fontsize=14, fontweight="bold")
# Add value labels on bars
if show_labels:
for bar_item, value in zip(bars, values):
height = bar_item.get_height()
ax.text(
bar_item.get_x() + bar_item.get_width() / 2.0,
height + 1,
f"{value:.1f}%",
ha="center",
va="bottom",
fontweight="bold",
)
plt.xticks(rotation=rotation, ha="right")
ax.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
buffer = io.BytesIO()
try:
fig.savefig(buffer, format="png", dpi=dpi, bbox_inches="tight")
buffer.seek(0)
finally:
plt.close(fig)
gc.collect() # Force garbage collection after heavy matplotlib operation
return buffer
def create_horizontal_bar_chart(
labels: list[str],
values: list[float],
xlabel: str = "Compliance (%)",
title: str | None = None,
color_func: Callable[[float], str] | None = None,
colors: list[str] | None = None,
figsize: tuple[int, int] | None = None,
dpi: int = DEFAULT_CHART_DPI,
x_limit: tuple[float, float] = (0, 100),
show_labels: bool = True,
label_fontsize: int = 16,
) -> io.BytesIO:
"""Create a horizontal bar chart.
Args:
labels: Y-axis labels (bar names)
values: Bar widths (numeric values)
xlabel: X-axis label
title: Optional chart title
color_func: Function to determine bar color based on value
colors: Explicit list of colors (overrides color_func)
figsize: Figure size (auto-calculated if None based on label count)
dpi: Resolution for output image
x_limit: X-axis limits (min, max)
show_labels: Whether to show value labels on bars
label_fontsize: Font size for y-axis labels
Returns:
BytesIO buffer containing the PNG image
"""
if color_func is None:
color_func = get_chart_color_for_percentage
# Auto-calculate figure size based on number of items
if figsize is None:
figsize = (10, max(6, int(len(labels) * 0.4)))
fig, ax = plt.subplots(figsize=figsize)
# Determine colors
if colors is None:
colors_list = [color_func(v) for v in values]
else:
colors_list = colors
y_pos = range(len(labels))
bars = ax.barh(y_pos, values, color=colors_list)
ax.set_yticks(y_pos)
ax.set_yticklabels(labels, fontsize=label_fontsize)
ax.set_xlabel(xlabel, fontsize=14)
ax.set_xlim(*x_limit)
if title:
ax.set_title(title, fontsize=14, fontweight="bold")
# Add value labels
if show_labels:
for bar_item, value in zip(bars, values):
width = bar_item.get_width()
ax.text(
width + 1,
bar_item.get_y() + bar_item.get_height() / 2.0,
f"{value:.1f}%",
ha="left",
va="center",
fontweight="bold",
fontsize=10,
)
ax.grid(True, alpha=0.3, axis="x")
plt.tight_layout()
buffer = io.BytesIO()
try:
fig.savefig(buffer, format="png", dpi=dpi, bbox_inches="tight")
buffer.seek(0)
finally:
plt.close(fig)
gc.collect() # Force garbage collection after heavy matplotlib operation
return buffer
def create_radar_chart(
labels: list[str],
values: list[float],
color: str = CHART_COLOR_BLUE,
fill_alpha: float = 0.25,
figsize: tuple[int, int] = (8, 8),
dpi: int = DEFAULT_CHART_DPI,
y_limit: tuple[float, float] = (0, 100),
y_ticks: list[int] | None = None,
label_fontsize: int = 14,
title: str | None = None,
) -> io.BytesIO:
"""Create a radar/spider chart.
Args:
labels: Category names around the chart
values: Values for each category (should have same length as labels)
color: Line and fill color
fill_alpha: Transparency of the fill (0-1)
figsize: Figure size (width, height) in inches
dpi: Resolution for output image
y_limit: Radial axis limits (min, max)
y_ticks: Custom tick values for radial axis
label_fontsize: Font size for category labels
title: Optional chart title
Returns:
BytesIO buffer containing the PNG image
"""
num_vars = len(labels)
angles = [n / float(num_vars) * 2 * math.pi for n in range(num_vars)]
# Close the polygon
values_closed = list(values) + [values[0]]
angles_closed = angles + [angles[0]]
fig, ax = plt.subplots(figsize=figsize, subplot_kw={"projection": "polar"})
ax.plot(angles_closed, values_closed, "o-", linewidth=2, color=color)
ax.fill(angles_closed, values_closed, alpha=fill_alpha, color=color)
ax.set_xticks(angles)
ax.set_xticklabels(labels, fontsize=label_fontsize)
ax.set_ylim(*y_limit)
if y_ticks is None:
y_ticks = [20, 40, 60, 80, 100]
ax.set_yticks(y_ticks)
ax.set_yticklabels([f"{t}%" for t in y_ticks], fontsize=12)
ax.grid(True, alpha=0.3)
if title:
ax.set_title(title, fontsize=14, fontweight="bold", y=1.08)
plt.tight_layout()
buffer = io.BytesIO()
try:
fig.savefig(buffer, format="png", dpi=dpi, bbox_inches="tight")
buffer.seek(0)
finally:
plt.close(fig)
gc.collect() # Force garbage collection after heavy matplotlib operation
return buffer
def create_pie_chart(
labels: list[str],
values: list[float],
colors: list[str] | None = None,
figsize: tuple[int, int] = (6, 6),
dpi: int = DEFAULT_CHART_DPI,
autopct: str = "%1.1f%%",
startangle: int = 90,
title: str | None = None,
) -> io.BytesIO:
"""Create a pie chart.
Args:
labels: Slice labels
values: Slice values
colors: Optional list of colors for slices
figsize: Figure size (width, height) in inches
dpi: Resolution for output image
autopct: Format string for percentage labels
startangle: Starting angle for first slice
title: Optional chart title
Returns:
BytesIO buffer containing the PNG image
"""
fig, ax = plt.subplots(figsize=figsize)
_, _, autotexts = ax.pie(
values,
labels=labels,
colors=colors,
autopct=autopct,
startangle=startangle,
)
# Style the text
for autotext in autotexts:
autotext.set_fontweight("bold")
if title:
ax.set_title(title, fontsize=14, fontweight="bold")
plt.tight_layout()
buffer = io.BytesIO()
try:
fig.savefig(buffer, format="png", dpi=dpi, bbox_inches="tight")
buffer.seek(0)
finally:
plt.close(fig)
gc.collect() # Force garbage collection after heavy matplotlib operation
return buffer
def create_stacked_bar_chart(
labels: list[str],
data_series: dict[str, list[float]],
colors: dict[str, str] | None = None,
xlabel: str = "",
ylabel: str = "Count",
title: str | None = None,
figsize: tuple[int, int] = (10, 6),
dpi: int = DEFAULT_CHART_DPI,
rotation: int = 45,
show_legend: bool = True,
) -> io.BytesIO:
"""Create a stacked bar chart.
Args:
labels: X-axis labels
data_series: Dictionary mapping series name to list of values
colors: Dictionary mapping series name to color
xlabel: X-axis label
ylabel: Y-axis label
title: Optional chart title
figsize: Figure size (width, height) in inches
dpi: Resolution for output image
rotation: X-axis label rotation angle
show_legend: Whether to show the legend
Returns:
BytesIO buffer containing the PNG image
"""
fig, ax = plt.subplots(figsize=figsize)
# Default colors if not provided
default_colors = {
"Pass": CHART_COLOR_GREEN_1,
"Fail": CHART_COLOR_RED,
"Manual": CHART_COLOR_YELLOW,
}
if colors is None:
colors = default_colors
bottom = [0] * len(labels)
for series_name, values in data_series.items():
color = colors.get(series_name, CHART_COLOR_BLUE)
ax.bar(labels, values, bottom=bottom, label=series_name, color=color)
bottom = [b + v for b, v in zip(bottom, values)]
ax.set_xlabel(xlabel, fontsize=12)
ax.set_ylabel(ylabel, fontsize=12)
if title:
ax.set_title(title, fontsize=14, fontweight="bold")
plt.xticks(rotation=rotation, ha="right")
if show_legend:
ax.legend()
ax.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
buffer = io.BytesIO()
try:
fig.savefig(buffer, format="png", dpi=dpi, bbox_inches="tight")
buffer.seek(0)
finally:
plt.close(fig)
gc.collect() # Force garbage collection after heavy matplotlib operation
return buffer

View File

@@ -0,0 +1,599 @@
from dataclasses import dataclass
from typing import Any, Callable
from reportlab.lib import colors
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import LongTable, Paragraph, Spacer, Table, TableStyle
from .config import (
ALTERNATE_ROWS_MAX_SIZE,
COLOR_BLUE,
COLOR_BORDER_GRAY,
COLOR_DARK_GRAY,
COLOR_GRID_GRAY,
COLOR_HIGH_RISK,
COLOR_LIGHT_GRAY,
COLOR_LOW_RISK,
COLOR_MEDIUM_RISK,
COLOR_SAFE,
COLOR_WHITE,
LONG_TABLE_THRESHOLD,
PADDING_LARGE,
PADDING_MEDIUM,
PADDING_SMALL,
PADDING_XLARGE,
)
def get_color_for_risk_level(risk_level: int) -> colors.Color:
"""
Get color based on risk level.
Args:
risk_level (int): Numeric risk level (0-5).
Returns:
colors.Color: Appropriate color for the risk level.
"""
if risk_level >= 4:
return COLOR_HIGH_RISK
if risk_level >= 3:
return COLOR_MEDIUM_RISK
if risk_level >= 2:
return COLOR_LOW_RISK
return COLOR_SAFE
def get_color_for_weight(weight: int) -> colors.Color:
"""
Get color based on weight value.
Args:
weight (int): Numeric weight value.
Returns:
colors.Color: Appropriate color for the weight.
"""
if weight > 100:
return COLOR_HIGH_RISK
if weight > 50:
return COLOR_LOW_RISK
return COLOR_SAFE
def get_color_for_compliance(percentage: float) -> colors.Color:
"""
Get color based on compliance percentage.
Args:
percentage (float): Compliance percentage (0-100).
Returns:
colors.Color: Appropriate color for the compliance level.
"""
if percentage >= 80:
return COLOR_SAFE
if percentage >= 60:
return COLOR_LOW_RISK
return COLOR_HIGH_RISK
def get_status_color(status: str) -> colors.Color:
"""
Get color for a status value.
Args:
status (str): Status string (PASS, FAIL, MANUAL, etc.).
Returns:
colors.Color: Appropriate color for the status.
"""
status_upper = status.upper()
if status_upper == "PASS":
return COLOR_SAFE
if status_upper == "FAIL":
return COLOR_HIGH_RISK
return COLOR_DARK_GRAY
def create_badge(
text: str,
bg_color: colors.Color,
text_color: colors.Color = COLOR_WHITE,
width: float = 1.4 * inch,
font: str = "FiraCode",
font_size: int = 11,
) -> Table:
"""
Create a generic colored badge component.
Args:
text (str): Text to display in the badge.
bg_color (colors.Color): Background color.
text_color (colors.Color): Text color (default white).
width (float): Badge width in inches.
font (str): Font name to use.
font_size (int): Font size.
Returns:
Table: A Table object styled as a badge.
"""
data = [[text]]
table = Table(data, colWidths=[width])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, 0), bg_color),
("TEXTCOLOR", (0, 0), (0, 0), text_color),
("FONTNAME", (0, 0), (0, 0), font),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("FONTSIZE", (0, 0), (-1, -1), font_size),
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("RIGHTPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("TOPPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("BOTTOMPADDING", (0, 0), (-1, -1), PADDING_LARGE),
]
)
)
return table
def create_status_badge(status: str) -> Table:
"""
Create a PASS/FAIL/MANUAL status badge.
Args:
status (str): Status value (e.g., "PASS", "FAIL", "MANUAL").
Returns:
Table: A styled Table badge for the status.
"""
status_upper = status.upper()
status_color = get_status_color(status_upper)
data = [["State:", status_upper]]
table = Table(data, colWidths=[0.6 * inch, 0.8 * inch])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, 0), COLOR_LIGHT_GRAY),
("FONTNAME", (0, 0), (0, 0), "PlusJakartaSans"),
("BACKGROUND", (1, 0), (1, 0), status_color),
("TEXTCOLOR", (1, 0), (1, 0), COLOR_WHITE),
("FONTNAME", (1, 0), (1, 0), "FiraCode"),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("FONTSIZE", (0, 0), (-1, -1), 12),
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("RIGHTPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("TOPPADDING", (0, 0), (-1, -1), PADDING_XLARGE),
("BOTTOMPADDING", (0, 0), (-1, -1), PADDING_XLARGE),
]
)
)
return table
def create_multi_badge_row(
badges: list[tuple[str, colors.Color]],
badge_width: float = 0.4 * inch,
font: str = "FiraCode",
) -> Table:
"""
Create a row of multiple small badges.
Args:
badges (list[tuple[str, colors.Color]]): List of (text, color) tuples for each badge.
badge_width (float): Width of each badge.
font (str): Font name to use.
Returns:
Table: A Table with multiple colored badges in a row.
"""
if not badges:
data = [["N/A"]]
table = Table(data, colWidths=[1 * inch])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, 0), COLOR_LIGHT_GRAY),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("FONTSIZE", (0, 0), (-1, -1), 10),
]
)
)
return table
data = [[text for text, _ in badges]]
col_widths = [badge_width] * len(badges)
table = Table(data, colWidths=col_widths)
styles = [
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("FONTNAME", (0, 0), (-1, -1), font),
("FONTSIZE", (0, 0), (-1, -1), 10),
("TEXTCOLOR", (0, 0), (-1, -1), COLOR_WHITE),
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), PADDING_SMALL),
("RIGHTPADDING", (0, 0), (-1, -1), PADDING_SMALL),
("TOPPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
("BOTTOMPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
]
for idx, (_, badge_color) in enumerate(badges):
styles.append(("BACKGROUND", (idx, 0), (idx, 0), badge_color))
table.setStyle(TableStyle(styles))
return table
def create_risk_component(
risk_level: int,
weight: int,
score: int = 0,
) -> Table:
"""
Create a visual risk component showing risk level, weight, and score.
Args:
risk_level (int): The risk level (0-5).
weight (int): The weight value.
score (int): The calculated score (default 0).
Returns:
Table: A styled Table showing risk metrics.
"""
risk_color = get_color_for_risk_level(risk_level)
weight_color = get_color_for_weight(weight)
data = [
[
"Risk Level:",
str(risk_level),
"Weight:",
str(weight),
"Score:",
str(score),
]
]
table = Table(
data,
colWidths=[
0.8 * inch,
0.4 * inch,
0.6 * inch,
0.4 * inch,
0.5 * inch,
0.4 * inch,
],
)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, 0), COLOR_LIGHT_GRAY),
("BACKGROUND", (1, 0), (1, 0), risk_color),
("TEXTCOLOR", (1, 0), (1, 0), COLOR_WHITE),
("FONTNAME", (1, 0), (1, 0), "FiraCode"),
("BACKGROUND", (2, 0), (2, 0), COLOR_LIGHT_GRAY),
("BACKGROUND", (3, 0), (3, 0), weight_color),
("TEXTCOLOR", (3, 0), (3, 0), COLOR_WHITE),
("FONTNAME", (3, 0), (3, 0), "FiraCode"),
("BACKGROUND", (4, 0), (4, 0), COLOR_LIGHT_GRAY),
("BACKGROUND", (5, 0), (5, 0), COLOR_DARK_GRAY),
("TEXTCOLOR", (5, 0), (5, 0), COLOR_WHITE),
("FONTNAME", (5, 0), (5, 0), "FiraCode"),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("FONTSIZE", (0, 0), (-1, -1), 10),
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
("RIGHTPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
("TOPPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("BOTTOMPADDING", (0, 0), (-1, -1), PADDING_LARGE),
]
)
)
return table
def create_info_table(
rows: list[tuple[str, Any]],
label_width: float = 2 * inch,
value_width: float = 4 * inch,
label_color: colors.Color = COLOR_BLUE,
value_bg_color: colors.Color | None = None,
normal_style: ParagraphStyle | None = None,
) -> Table:
"""
Create a key-value information table.
Args:
rows (list[tuple[str, Any]]): List of (label, value) tuples.
label_width (float): Width of the label column.
value_width (float): Width of the value column.
label_color (colors.Color): Background color for labels.
value_bg_color (colors.Color | None): Background color for values (optional).
normal_style (ParagraphStyle | None): ParagraphStyle for wrapping long values.
Returns:
Table: A styled Table with key-value pairs.
"""
from .config import COLOR_BG_BLUE
if value_bg_color is None:
value_bg_color = COLOR_BG_BLUE
# Handle empty rows case - Table requires at least one row
if not rows:
table = Table([["", ""]], colWidths=[label_width, value_width])
table.setStyle(TableStyle([("FONTSIZE", (0, 0), (-1, -1), 0)]))
return table
# Process rows - wrap long values in Paragraph if style provided
table_data = []
for label, value in rows:
if normal_style and isinstance(value, str) and len(value) > 50:
value = Paragraph(value, normal_style)
table_data.append([label, value])
table = Table(table_data, colWidths=[label_width, value_width])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, -1), label_color),
("TEXTCOLOR", (0, 0), (0, -1), COLOR_WHITE),
("FONTNAME", (0, 0), (0, -1), "FiraCode"),
("BACKGROUND", (1, 0), (1, -1), value_bg_color),
("TEXTCOLOR", (1, 0), (1, -1), COLOR_DARK_GRAY),
("FONTNAME", (1, 0), (1, -1), "PlusJakartaSans"),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("FONTSIZE", (0, 0), (-1, -1), 11),
("GRID", (0, 0), (-1, -1), 1, COLOR_BORDER_GRAY),
("LEFTPADDING", (0, 0), (-1, -1), PADDING_XLARGE),
("RIGHTPADDING", (0, 0), (-1, -1), PADDING_XLARGE),
("TOPPADDING", (0, 0), (-1, -1), PADDING_LARGE),
("BOTTOMPADDING", (0, 0), (-1, -1), PADDING_LARGE),
]
)
)
return table
@dataclass
class ColumnConfig:
"""
Configuration for a table column.
Attributes:
header (str): Column header text.
width (float): Column width in inches.
field (str | Callable[[Any], str]): Field name or callable to extract value from data.
align (str): Text alignment (LEFT, CENTER, RIGHT).
"""
header: str
width: float
field: str | Callable[[Any], str]
align: str = "CENTER"
def create_data_table(
data: list[dict[str, Any]],
columns: list[ColumnConfig],
header_color: colors.Color = COLOR_BLUE,
alternate_rows: bool = True,
normal_style: ParagraphStyle | None = None,
) -> Table | LongTable:
"""
Create a data table with configurable columns.
Uses LongTable for large datasets (>50 rows) for better memory efficiency
and page splitting. LongTable repeats headers on each page and has
optimized memory handling for large tables.
Args:
data (list[dict[str, Any]]): List of data dictionaries.
columns (list[ColumnConfig]): Column configuration list.
header_color (colors.Color): Background color for header row.
alternate_rows (bool): Whether to alternate row backgrounds.
normal_style (ParagraphStyle | None): ParagraphStyle for cell values.
Returns:
Table or LongTable: A styled table with data.
"""
# Build header row
header_row = [col.header for col in columns]
table_data = [header_row]
# Build data rows
for item in data:
row = []
for col in columns:
if callable(col.field):
value = col.field(item)
else:
value = item.get(col.field, "")
if normal_style and isinstance(value, str):
value = Paragraph(value, normal_style)
row.append(value)
table_data.append(row)
col_widths = [col.width for col in columns]
# Use LongTable for large datasets - it handles page breaks better
# and has optimized memory handling for tables with many rows
use_long_table = len(data) > LONG_TABLE_THRESHOLD
if use_long_table:
table = LongTable(table_data, colWidths=col_widths, repeatRows=1)
else:
table = Table(table_data, colWidths=col_widths)
styles = [
("BACKGROUND", (0, 0), (-1, 0), header_color),
("TEXTCOLOR", (0, 0), (-1, 0), COLOR_WHITE),
("FONTNAME", (0, 0), (-1, 0), "FiraCode"),
("FONTSIZE", (0, 0), (-1, 0), 10),
("FONTSIZE", (0, 1), (-1, -1), 9),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("GRID", (0, 0), (-1, -1), 1, COLOR_GRID_GRAY),
("LEFTPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
("RIGHTPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
("TOPPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
("BOTTOMPADDING", (0, 0), (-1, -1), PADDING_MEDIUM),
]
# Apply column alignments
for idx, col in enumerate(columns):
styles.append(("ALIGN", (idx, 0), (idx, -1), col.align))
# Alternate row backgrounds - skip for very large tables as it adds memory overhead
if (
alternate_rows
and len(table_data) > 1
and len(table_data) <= ALTERNATE_ROWS_MAX_SIZE
):
for i in range(1, len(table_data)):
if i % 2 == 0:
styles.append(
("BACKGROUND", (0, i), (-1, i), colors.Color(0.98, 0.98, 0.98))
)
table.setStyle(TableStyle(styles))
return table
def create_findings_table(
findings: list[Any],
columns: list[ColumnConfig] | None = None,
header_color: colors.Color = COLOR_BLUE,
normal_style: ParagraphStyle | None = None,
) -> Table:
"""
Create a findings table with default or custom columns.
Args:
findings (list[Any]): List of finding objects.
columns (list[ColumnConfig] | None): Optional column configuration (defaults to standard columns).
header_color (colors.Color): Background color for header row.
normal_style (ParagraphStyle | None): ParagraphStyle for cell values.
Returns:
Table: A styled Table with findings data.
"""
if columns is None:
columns = [
ColumnConfig("Finding", 2.5 * inch, "title"),
ColumnConfig("Resource", 3 * inch, "resource_name"),
ColumnConfig("Severity", 0.9 * inch, "severity"),
ColumnConfig("Status", 0.9 * inch, "status"),
ColumnConfig("Region", 0.9 * inch, "region"),
]
# Convert findings to dicts
data = []
for finding in findings:
item = {}
for col in columns:
if callable(col.field):
item[col.header.lower()] = col.field(finding)
elif hasattr(finding, col.field):
item[col.field] = getattr(finding, col.field, "")
elif isinstance(finding, dict):
item[col.field] = finding.get(col.field, "")
data.append(item)
return create_data_table(
data=data,
columns=columns,
header_color=header_color,
alternate_rows=True,
normal_style=normal_style,
)
def create_section_header(
text: str,
style: ParagraphStyle,
add_spacer: bool = True,
spacer_height: float = 0.2,
) -> list:
"""
Create a section header with optional spacer.
Args:
text (str): Header text.
style (ParagraphStyle): ParagraphStyle to apply.
add_spacer (bool): Whether to add a spacer after the header.
spacer_height (float): Height of the spacer in inches.
Returns:
list: List of elements (Paragraph and optional Spacer).
"""
elements = [Paragraph(text, style)]
if add_spacer:
elements.append(Spacer(1, spacer_height * inch))
return elements
def create_summary_table(
label: str,
value: str,
value_color: colors.Color,
label_width: float = 2.5 * inch,
value_width: float = 2 * inch,
) -> Table:
"""
Create a summary metric table (e.g., for ThreatScore display).
Args:
label (str): Label text (e.g., "ThreatScore:").
value (str): Value text (e.g., "85.5%").
value_color (colors.Color): Background color for the value cell.
label_width (float): Width of the label column.
value_width (float): Width of the value column.
Returns:
Table: A styled summary Table.
"""
data = [[label, value]]
table = Table(data, colWidths=[label_width, value_width])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, 0), colors.Color(0.1, 0.3, 0.5)),
("TEXTCOLOR", (0, 0), (0, 0), COLOR_WHITE),
("FONTNAME", (0, 0), (0, 0), "FiraCode"),
("FONTSIZE", (0, 0), (0, 0), 12),
("BACKGROUND", (1, 0), (1, 0), value_color),
("TEXTCOLOR", (1, 0), (1, 0), COLOR_WHITE),
("FONTNAME", (1, 0), (1, 0), "FiraCode"),
("FONTSIZE", (1, 0), (1, 0), 16),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("GRID", (0, 0), (-1, -1), 1.5, colors.Color(0.5, 0.6, 0.7)),
("LEFTPADDING", (0, 0), (-1, -1), 12),
("RIGHTPADDING", (0, 0), (-1, -1), 12),
("TOPPADDING", (0, 0), (-1, -1), 10),
("BOTTOMPADDING", (0, 0), (-1, -1), 10),
]
)
)
return table

View File

@@ -0,0 +1,286 @@
from dataclasses import dataclass, field
from reportlab.lib import colors
from reportlab.lib.units import inch
# =============================================================================
# Performance & Memory Optimization Settings
# =============================================================================
# These settings control memory usage and performance for large reports.
# Adjust these values if workers are running out of memory.
# Chart settings - lower DPI = less memory, 150 is good quality for PDF
CHART_DPI_DEFAULT = 150
# LongTable threshold - use LongTable for tables with more rows than this
# LongTable handles page breaks better and has optimized memory for large tables
LONG_TABLE_THRESHOLD = 50
# Skip alternating row colors for tables larger than this (reduces memory)
ALTERNATE_ROWS_MAX_SIZE = 200
# Database query batch size for findings (matches Django settings)
# Larger = fewer queries but more memory per batch
FINDINGS_BATCH_SIZE = 2000
# =============================================================================
# Base colors
# =============================================================================
COLOR_PROWLER_DARK_GREEN = colors.Color(0.1, 0.5, 0.2)
COLOR_BLUE = colors.Color(0.2, 0.4, 0.6)
COLOR_LIGHT_BLUE = colors.Color(0.3, 0.5, 0.7)
COLOR_LIGHTER_BLUE = colors.Color(0.4, 0.6, 0.8)
COLOR_BG_BLUE = colors.Color(0.95, 0.97, 1.0)
COLOR_BG_LIGHT_BLUE = colors.Color(0.98, 0.99, 1.0)
COLOR_GRAY = colors.Color(0.2, 0.2, 0.2)
COLOR_LIGHT_GRAY = colors.Color(0.9, 0.9, 0.9)
COLOR_BORDER_GRAY = colors.Color(0.7, 0.8, 0.9)
COLOR_GRID_GRAY = colors.Color(0.7, 0.7, 0.7)
COLOR_DARK_GRAY = colors.Color(0.4, 0.4, 0.4)
COLOR_HEADER_DARK = colors.Color(0.1, 0.3, 0.5)
COLOR_HEADER_MEDIUM = colors.Color(0.15, 0.35, 0.55)
COLOR_WHITE = colors.white
# Risk and status colors
COLOR_HIGH_RISK = colors.Color(0.8, 0.2, 0.2)
COLOR_MEDIUM_RISK = colors.Color(0.9, 0.6, 0.2)
COLOR_LOW_RISK = colors.Color(0.9, 0.9, 0.2)
COLOR_SAFE = colors.Color(0.2, 0.8, 0.2)
# ENS specific colors
COLOR_ENS_ALTO = colors.Color(0.8, 0.2, 0.2)
COLOR_ENS_MEDIO = colors.Color(0.98, 0.75, 0.13)
COLOR_ENS_BAJO = colors.Color(0.06, 0.72, 0.51)
COLOR_ENS_OPCIONAL = colors.Color(0.42, 0.45, 0.50)
COLOR_ENS_TIPO = colors.Color(0.2, 0.4, 0.6)
COLOR_ENS_AUTO = colors.Color(0.30, 0.69, 0.31)
COLOR_ENS_MANUAL = colors.Color(0.96, 0.60, 0.0)
# NIS2 specific colors
COLOR_NIS2_PRIMARY = colors.Color(0.12, 0.23, 0.54)
COLOR_NIS2_SECONDARY = colors.Color(0.23, 0.51, 0.96)
COLOR_NIS2_BG_BLUE = colors.Color(0.96, 0.97, 0.99)
# Chart colors (hex strings for matplotlib)
CHART_COLOR_GREEN_1 = "#4CAF50"
CHART_COLOR_GREEN_2 = "#8BC34A"
CHART_COLOR_YELLOW = "#FFEB3B"
CHART_COLOR_ORANGE = "#FF9800"
CHART_COLOR_RED = "#F44336"
CHART_COLOR_BLUE = "#2196F3"
# ENS dimension mappings: dimension name -> (abbreviation, color)
DIMENSION_MAPPING = {
"trazabilidad": ("T", colors.Color(0.26, 0.52, 0.96)),
"autenticidad": ("A", colors.Color(0.30, 0.69, 0.31)),
"integridad": ("I", colors.Color(0.61, 0.15, 0.69)),
"confidencialidad": ("C", colors.Color(0.96, 0.26, 0.21)),
"disponibilidad": ("D", colors.Color(1.0, 0.60, 0.0)),
}
# ENS tipo icons
TIPO_ICONS = {
"requisito": "\u26a0\ufe0f",
"refuerzo": "\U0001f6e1\ufe0f",
"recomendacion": "\U0001f4a1",
"medida": "\U0001f4cb",
}
# Dimension names for charts (Spanish)
DIMENSION_NAMES = [
"Trazabilidad",
"Autenticidad",
"Integridad",
"Confidencialidad",
"Disponibilidad",
]
DIMENSION_KEYS = [
"trazabilidad",
"autenticidad",
"integridad",
"confidencialidad",
"disponibilidad",
]
# ENS nivel and tipo order
ENS_NIVEL_ORDER = ["alto", "medio", "bajo", "opcional"]
ENS_TIPO_ORDER = ["requisito", "refuerzo", "recomendacion", "medida"]
# ThreatScore sections
THREATSCORE_SECTIONS = [
"1. IAM",
"2. Attack Surface",
"3. Logging and Monitoring",
"4. Encryption",
]
# NIS2 sections
NIS2_SECTIONS = [
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"9",
"11",
"12",
]
NIS2_SECTION_TITLES = {
"1": "1. Policy on Security",
"2": "2. Risk Management",
"3": "3. Incident Handling",
"4": "4. Business Continuity",
"5": "5. Supply Chain",
"6": "6. Acquisition & Dev",
"7": "7. Effectiveness",
"9": "9. Cryptography",
"11": "11. Access Control",
"12": "12. Asset Management",
}
# Table column widths
COL_WIDTH_SMALL = 0.4 * inch
COL_WIDTH_MEDIUM = 0.9 * inch
COL_WIDTH_LARGE = 1.5 * inch
COL_WIDTH_XLARGE = 2 * inch
COL_WIDTH_XXLARGE = 3 * inch
# Common padding values
PADDING_SMALL = 4
PADDING_MEDIUM = 6
PADDING_LARGE = 8
PADDING_XLARGE = 10
@dataclass
class FrameworkConfig:
"""
Configuration for a compliance framework PDF report.
This dataclass defines all the configurable aspects of a compliance framework
report, including visual styling, metadata fields, and feature flags.
Attributes:
name (str): Internal framework identifier (e.g., "prowler_threatscore").
display_name (str): Human-readable framework name for the report title.
logo_filename (str | None): Optional filename of the framework logo in assets/img/.
primary_color (colors.Color): Main color used for headers and important elements.
secondary_color (colors.Color): Secondary color for sub-headers and accents.
bg_color (colors.Color): Background color for highlighted sections.
attribute_fields (list[str]): List of metadata field names to extract from requirements.
sections (list[str] | None): Optional ordered list of section names for grouping.
language (str): Report language ("en" for English, "es" for Spanish).
has_risk_levels (bool): Whether the framework uses numeric risk levels.
has_dimensions (bool): Whether the framework uses security dimensions (ENS).
has_niveles (bool): Whether the framework uses nivel classification (ENS).
has_weight (bool): Whether requirements have weight values.
"""
name: str
display_name: str
logo_filename: str | None = None
primary_color: colors.Color = field(default_factory=lambda: COLOR_BLUE)
secondary_color: colors.Color = field(default_factory=lambda: COLOR_LIGHT_BLUE)
bg_color: colors.Color = field(default_factory=lambda: COLOR_BG_BLUE)
attribute_fields: list[str] = field(default_factory=list)
sections: list[str] | None = None
language: str = "en"
has_risk_levels: bool = False
has_dimensions: bool = False
has_niveles: bool = False
has_weight: bool = False
FRAMEWORK_REGISTRY: dict[str, FrameworkConfig] = {
"prowler_threatscore": FrameworkConfig(
name="prowler_threatscore",
display_name="Prowler ThreatScore",
logo_filename=None,
primary_color=COLOR_BLUE,
secondary_color=COLOR_LIGHT_BLUE,
bg_color=COLOR_BG_BLUE,
attribute_fields=[
"Title",
"Section",
"SubSection",
"LevelOfRisk",
"Weight",
"AttributeDescription",
"AdditionalInformation",
],
sections=THREATSCORE_SECTIONS,
language="en",
has_risk_levels=True,
has_weight=True,
),
"ens": FrameworkConfig(
name="ens",
display_name="ENS RD2022",
logo_filename="ens_logo.png",
primary_color=COLOR_ENS_ALTO,
secondary_color=COLOR_ENS_MEDIO,
bg_color=COLOR_BG_BLUE,
attribute_fields=[
"IdGrupoControl",
"Marco",
"Categoria",
"DescripcionControl",
"Tipo",
"Nivel",
"Dimensiones",
"ModoEjecucion",
],
sections=None,
language="es",
has_risk_levels=False,
has_dimensions=True,
has_niveles=True,
has_weight=False,
),
"nis2": FrameworkConfig(
name="nis2",
display_name="NIS2 Directive",
logo_filename="nis2_logo.png",
primary_color=COLOR_NIS2_PRIMARY,
secondary_color=COLOR_NIS2_SECONDARY,
bg_color=COLOR_NIS2_BG_BLUE,
attribute_fields=[
"Section",
"SubSection",
"Description",
],
sections=NIS2_SECTIONS,
language="en",
has_risk_levels=False,
has_dimensions=False,
has_niveles=False,
has_weight=False,
),
}
def get_framework_config(compliance_id: str) -> FrameworkConfig | None:
"""
Get framework configuration based on compliance ID.
Args:
compliance_id (str): The compliance framework identifier (e.g., "prowler_threatscore_aws").
Returns:
FrameworkConfig | None: The framework configuration if found, None otherwise.
"""
compliance_lower = compliance_id.lower()
if "threatscore" in compliance_lower:
return FRAMEWORK_REGISTRY["prowler_threatscore"]
if "ens" in compliance_lower:
return FRAMEWORK_REGISTRY["ens"]
if "nis2" in compliance_lower:
return FRAMEWORK_REGISTRY["nis2"]
return None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,471 @@
import os
from collections import defaultdict
from reportlab.lib.units import inch
from reportlab.platypus import Image, PageBreak, Paragraph, Spacer, Table, TableStyle
from api.models import StatusChoices
from .base import (
BaseComplianceReportGenerator,
ComplianceData,
get_requirement_metadata,
)
from .charts import create_horizontal_bar_chart, get_chart_color_for_percentage
from .config import (
COLOR_BORDER_GRAY,
COLOR_DARK_GRAY,
COLOR_GRAY,
COLOR_GRID_GRAY,
COLOR_HIGH_RISK,
COLOR_NIS2_BG_BLUE,
COLOR_NIS2_PRIMARY,
COLOR_SAFE,
COLOR_WHITE,
NIS2_SECTION_TITLES,
NIS2_SECTIONS,
)
def _extract_section_number(section_string: str) -> str:
"""Extract the section number from a full NIS2 section title.
NIS2 section strings are formatted like:
"1 POLICY ON THE SECURITY OF NETWORK AND INFORMATION SYSTEMS..."
This function extracts just the leading number.
Args:
section_string: Full section title string.
Returns:
Section number as string (e.g., "1", "2", "11").
"""
if not section_string:
return "Other"
parts = section_string.split()
if parts and parts[0].isdigit():
return parts[0]
return "Other"
class NIS2ReportGenerator(BaseComplianceReportGenerator):
"""
PDF report generator for NIS2 Directive (EU) 2022/2555.
This generator creates comprehensive PDF reports containing:
- Cover page with both Prowler and NIS2 logos
- Executive summary with overall compliance score
- Section analysis with horizontal bar chart
- SubSection breakdown table
- Critical failed requirements
- Requirements index organized by section and subsection
- Detailed findings for failed requirements
"""
def create_cover_page(self, data: ComplianceData) -> list:
"""
Create the NIS2 report cover page with both logos.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
# Create logos side by side
prowler_logo_path = os.path.join(
os.path.dirname(__file__), "../../assets/img/prowler_logo.png"
)
nis2_logo_path = os.path.join(
os.path.dirname(__file__), "../../assets/img/nis2_logo.png"
)
prowler_logo = Image(prowler_logo_path, width=3.5 * inch, height=0.7 * inch)
nis2_logo = Image(nis2_logo_path, width=2.3 * inch, height=1.5 * inch)
logos_table = Table(
[[prowler_logo, nis2_logo]], colWidths=[4 * inch, 2.5 * inch]
)
logos_table.setStyle(
TableStyle(
[
("ALIGN", (0, 0), (0, 0), "LEFT"),
("ALIGN", (1, 0), (1, 0), "RIGHT"),
("VALIGN", (0, 0), (0, 0), "MIDDLE"),
("VALIGN", (1, 0), (1, 0), "MIDDLE"),
]
)
)
elements.append(logos_table)
elements.append(Spacer(1, 0.3 * inch))
# Title
title = Paragraph(
"NIS2 Compliance Report<br/>Directive (EU) 2022/2555",
self.styles["title"],
)
elements.append(title)
elements.append(Spacer(1, 0.3 * inch))
# Compliance metadata table - use base class helper for consistency
info_rows = self._build_info_rows(data, language="en")
# Convert tuples to lists and wrap long text in Paragraphs
metadata_data = []
for label, value in info_rows:
if label in ("Name:", "Description:") and value:
metadata_data.append(
[label, Paragraph(value, self.styles["normal_center"])]
)
else:
metadata_data.append([label, value])
metadata_table = Table(metadata_data, colWidths=[2 * inch, 4 * inch])
metadata_table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, -1), COLOR_NIS2_PRIMARY),
("TEXTCOLOR", (0, 0), (0, -1), COLOR_WHITE),
("FONTNAME", (0, 0), (0, -1), "FiraCode"),
("BACKGROUND", (1, 0), (1, -1), COLOR_NIS2_BG_BLUE),
("TEXTCOLOR", (1, 0), (1, -1), COLOR_GRAY),
("FONTNAME", (1, 0), (1, -1), "PlusJakartaSans"),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("FONTSIZE", (0, 0), (-1, -1), 11),
("GRID", (0, 0), (-1, -1), 1, COLOR_BORDER_GRAY),
("LEFTPADDING", (0, 0), (-1, -1), 10),
("RIGHTPADDING", (0, 0), (-1, -1), 10),
("TOPPADDING", (0, 0), (-1, -1), 8),
("BOTTOMPADDING", (0, 0), (-1, -1), 8),
]
)
)
elements.append(metadata_table)
return elements
def create_executive_summary(self, data: ComplianceData) -> list:
"""
Create the executive summary with compliance metrics.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
elements.append(Paragraph("Executive Summary", self.styles["h1"]))
elements.append(Spacer(1, 0.1 * inch))
# Calculate statistics
total = len(data.requirements)
passed = sum(1 for r in data.requirements if r.status == StatusChoices.PASS)
failed = sum(1 for r in data.requirements if r.status == StatusChoices.FAIL)
manual = sum(1 for r in data.requirements if r.status == StatusChoices.MANUAL)
# Calculate compliance excluding manual
evaluated = passed + failed
overall_compliance = (passed / evaluated * 100) if evaluated > 0 else 100
# Summary statistics table
summary_data = [
["Metric", "Value"],
["Total Requirements", str(total)],
["Passed ✓", str(passed)],
["Failed ✗", str(failed)],
["Manual ⊙", str(manual)],
["Overall Compliance", f"{overall_compliance:.1f}%"],
]
summary_table = Table(summary_data, colWidths=[3 * inch, 2 * inch])
summary_table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), COLOR_NIS2_PRIMARY),
("TEXTCOLOR", (0, 0), (-1, 0), COLOR_WHITE),
("BACKGROUND", (0, 2), (0, 2), COLOR_SAFE),
("TEXTCOLOR", (0, 2), (0, 2), COLOR_WHITE),
("BACKGROUND", (0, 3), (0, 3), COLOR_HIGH_RISK),
("TEXTCOLOR", (0, 3), (0, 3), COLOR_WHITE),
("BACKGROUND", (0, 4), (0, 4), COLOR_DARK_GRAY),
("TEXTCOLOR", (0, 4), (0, 4), COLOR_WHITE),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("FONTNAME", (0, 0), (-1, 0), "PlusJakartaSans"),
("FONTSIZE", (0, 0), (-1, 0), 12),
("FONTSIZE", (0, 1), (-1, -1), 10),
("BOTTOMPADDING", (0, 0), (-1, 0), 10),
("GRID", (0, 0), (-1, -1), 0.5, COLOR_BORDER_GRAY),
(
"ROWBACKGROUNDS",
(1, 1),
(1, -1),
[COLOR_WHITE, COLOR_NIS2_BG_BLUE],
),
]
)
)
elements.append(summary_table)
return elements
def create_charts_section(self, data: ComplianceData) -> list:
"""
Create the charts section with section analysis.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
# Section chart
elements.append(Paragraph("Compliance by Section", self.styles["h1"]))
elements.append(Spacer(1, 0.1 * inch))
elements.append(
Paragraph(
"The following chart shows compliance percentage for each main section "
"of the NIS2 directive:",
self.styles["normal_center"],
)
)
elements.append(Spacer(1, 0.1 * inch))
chart_buffer = self._create_section_chart(data)
chart_buffer.seek(0)
chart_image = Image(chart_buffer, width=6.5 * inch, height=5 * inch)
elements.append(chart_image)
elements.append(PageBreak())
# SubSection breakdown table
elements.append(Paragraph("SubSection Breakdown", self.styles["h1"]))
elements.append(Spacer(1, 0.1 * inch))
subsection_table = self._create_subsection_table(data)
elements.append(subsection_table)
return elements
def create_requirements_index(self, data: ComplianceData) -> list:
"""
Create the requirements index organized by section and subsection.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
elements.append(Paragraph("Requirements Index", self.styles["h1"]))
elements.append(Spacer(1, 0.1 * inch))
# Organize by section number and subsection
sections = {}
for req in data.requirements:
m = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
if m:
full_section = getattr(m, "Section", "Other")
# Extract section number from full title (e.g., "1 POLICY..." -> "1")
section_num = _extract_section_number(full_section)
subsection = getattr(m, "SubSection", "")
description = getattr(m, "Description", req.description)
if section_num not in sections:
sections[section_num] = {}
if subsection not in sections[section_num]:
sections[section_num][subsection] = []
sections[section_num][subsection].append(
{
"id": req.id,
"description": description,
"status": req.status,
}
)
# Sort by NIS2 section order
for section in NIS2_SECTIONS:
if section not in sections:
continue
section_title = NIS2_SECTION_TITLES.get(section, f"Section {section}")
elements.append(Paragraph(section_title, self.styles["h2"]))
for subsection_name, reqs in sections[section].items():
if subsection_name:
# Truncate long subsection names for display
display_subsection = (
subsection_name[:80] + "..."
if len(subsection_name) > 80
else subsection_name
)
elements.append(Paragraph(display_subsection, self.styles["h3"]))
for req in reqs:
status_indicator = (
"" if req["status"] == StatusChoices.PASS else ""
)
if req["status"] == StatusChoices.MANUAL:
status_indicator = ""
desc = (
req["description"][:60] + "..."
if len(req["description"]) > 60
else req["description"]
)
elements.append(
Paragraph(
f"{status_indicator} {req['id']}: {desc}",
self.styles["normal"],
)
)
elements.append(Spacer(1, 0.1 * inch))
return elements
def _create_section_chart(self, data: ComplianceData):
"""
Create the section compliance chart.
Args:
data: Aggregated compliance data.
Returns:
BytesIO buffer containing the chart image.
"""
section_scores = defaultdict(lambda: {"passed": 0, "total": 0})
for req in data.requirements:
if req.status == StatusChoices.MANUAL:
continue
m = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
if m:
full_section = getattr(m, "Section", "Other")
# Extract section number from full title (e.g., "1 POLICY..." -> "1")
section_num = _extract_section_number(full_section)
section_scores[section_num]["total"] += 1
if req.status == StatusChoices.PASS:
section_scores[section_num]["passed"] += 1
# Build labels and values in NIS2 section order
labels = []
values = []
for section in NIS2_SECTIONS:
if section in section_scores and section_scores[section]["total"] > 0:
scores = section_scores[section]
pct = (scores["passed"] / scores["total"]) * 100
section_title = NIS2_SECTION_TITLES.get(section, f"Section {section}")
labels.append(section_title)
values.append(pct)
return create_horizontal_bar_chart(
labels=labels,
values=values,
xlabel="Compliance (%)",
color_func=get_chart_color_for_percentage,
)
def _create_subsection_table(self, data: ComplianceData) -> Table:
"""
Create the subsection breakdown table.
Args:
data: Aggregated compliance data.
Returns:
ReportLab Table element.
"""
subsection_scores = defaultdict(lambda: {"passed": 0, "failed": 0, "manual": 0})
for req in data.requirements:
m = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
if m:
full_section = getattr(m, "Section", "")
subsection = getattr(m, "SubSection", "")
# Use section number + subsection for grouping
section_num = _extract_section_number(full_section)
# Create a shorter key using section number
if subsection:
# Extract subsection number if present (e.g., "1.1 Policy..." -> "1.1")
subsection_parts = subsection.split()
if subsection_parts:
key = subsection_parts[0] # Just the number like "1.1"
else:
key = f"{section_num}"
else:
key = section_num
if req.status == StatusChoices.PASS:
subsection_scores[key]["passed"] += 1
elif req.status == StatusChoices.FAIL:
subsection_scores[key]["failed"] += 1
else:
subsection_scores[key]["manual"] += 1
table_data = [["Section", "Passed", "Failed", "Manual", "Compliance"]]
for key, scores in sorted(
subsection_scores.items(), key=lambda x: self._sort_section_key(x[0])
):
total = scores["passed"] + scores["failed"]
pct = (scores["passed"] / total * 100) if total > 0 else 100
table_data.append(
[
key,
str(scores["passed"]),
str(scores["failed"]),
str(scores["manual"]),
f"{pct:.1f}%",
]
)
table = Table(
table_data,
colWidths=[1.2 * inch, 0.9 * inch, 0.9 * inch, 0.9 * inch, 1.2 * inch],
)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), COLOR_NIS2_PRIMARY),
("TEXTCOLOR", (0, 0), (-1, 0), COLOR_WHITE),
("FONTNAME", (0, 0), (-1, 0), "FiraCode"),
("FONTSIZE", (0, 0), (-1, 0), 10),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("FONTSIZE", (0, 1), (-1, -1), 9),
("GRID", (0, 0), (-1, -1), 0.5, COLOR_GRID_GRAY),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 4),
("BOTTOMPADDING", (0, 0), (-1, -1), 4),
(
"ROWBACKGROUNDS",
(0, 1),
(-1, -1),
[COLOR_WHITE, COLOR_NIS2_BG_BLUE],
),
]
)
)
return table
def _sort_section_key(self, key: str) -> tuple:
"""Sort section keys numerically (e.g., 1, 1.1, 1.2, 2, 11)."""
parts = key.split(".")
result = []
for part in parts:
try:
result.append(int(part))
except ValueError:
result.append(float("inf"))
return tuple(result)

View File

@@ -0,0 +1,438 @@
from reportlab.lib import colors
from reportlab.lib.units import inch
from reportlab.platypus import Image, PageBreak, Paragraph, Spacer, Table, TableStyle
from api.models import StatusChoices
from .base import (
BaseComplianceReportGenerator,
ComplianceData,
get_requirement_metadata,
)
from .charts import create_vertical_bar_chart, get_chart_color_for_percentage
from .components import get_color_for_compliance, get_color_for_weight
from .config import COLOR_HIGH_RISK, COLOR_WHITE
class ThreatScoreReportGenerator(BaseComplianceReportGenerator):
"""
PDF report generator for Prowler ThreatScore framework.
This generator creates comprehensive PDF reports containing:
- Compliance overview and metadata
- Section-by-section compliance scores with charts
- Overall ThreatScore calculation
- Critical failed requirements
- Detailed findings for each requirement
"""
def create_executive_summary(self, data: ComplianceData) -> list:
"""
Create the executive summary section with ThreatScore calculation.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
elements.append(Paragraph("Compliance Score by Sections", self.styles["h1"]))
elements.append(Spacer(1, 0.2 * inch))
# Create section score chart
chart_buffer = self._create_section_score_chart(data)
chart_image = Image(chart_buffer, width=7 * inch, height=5.5 * inch)
elements.append(chart_image)
# Calculate overall ThreatScore
overall_compliance = self._calculate_threatscore(data)
elements.append(Spacer(1, 0.3 * inch))
# Summary table
summary_data = [["ThreatScore:", f"{overall_compliance:.2f}%"]]
compliance_color = get_color_for_compliance(overall_compliance)
summary_table = Table(summary_data, colWidths=[2.5 * inch, 2 * inch])
summary_table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (0, 0), colors.Color(0.1, 0.3, 0.5)),
("TEXTCOLOR", (0, 0), (0, 0), colors.white),
("FONTNAME", (0, 0), (0, 0), "FiraCode"),
("FONTSIZE", (0, 0), (0, 0), 12),
("BACKGROUND", (1, 0), (1, 0), compliance_color),
("TEXTCOLOR", (1, 0), (1, 0), colors.white),
("FONTNAME", (1, 0), (1, 0), "FiraCode"),
("FONTSIZE", (1, 0), (1, 0), 16),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("GRID", (0, 0), (-1, -1), 1.5, colors.Color(0.5, 0.6, 0.7)),
("LEFTPADDING", (0, 0), (-1, -1), 12),
("RIGHTPADDING", (0, 0), (-1, -1), 12),
("TOPPADDING", (0, 0), (-1, -1), 10),
("BOTTOMPADDING", (0, 0), (-1, -1), 10),
]
)
)
elements.append(summary_table)
return elements
def create_charts_section(self, data: ComplianceData) -> list:
"""
Create the critical failed requirements section.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
min_risk_level = getattr(self, "_min_risk_level", 4)
# Start on a new page
elements.append(PageBreak())
elements.append(
Paragraph("Top Requirements by Level of Risk", self.styles["h1"])
)
elements.append(Spacer(1, 0.1 * inch))
elements.append(
Paragraph(
f"Critical Failed Requirements (Risk Level ≥ {min_risk_level})",
self.styles["h2"],
)
)
elements.append(Spacer(1, 0.2 * inch))
critical_failed = self._get_critical_failed_requirements(data, min_risk_level)
if not critical_failed:
elements.append(
Paragraph(
"✅ No critical failed requirements found. Great job!",
self.styles["normal"],
)
)
else:
elements.append(
Paragraph(
f"Found {len(critical_failed)} critical failed requirements "
"that require immediate attention:",
self.styles["normal"],
)
)
elements.append(Spacer(1, 0.5 * inch))
table = self._create_critical_requirements_table(critical_failed)
elements.append(table)
return elements
def create_requirements_index(self, data: ComplianceData) -> list:
"""
Create the requirements index organized by section and subsection.
Args:
data: Aggregated compliance data.
Returns:
List of ReportLab elements.
"""
elements = []
elements.append(Paragraph("Requirements Index", self.styles["h1"]))
# Organize requirements by section and subsection
sections = {}
for req_id in data.attributes_by_requirement_id:
m = get_requirement_metadata(req_id, data.attributes_by_requirement_id)
if m:
section = getattr(m, "Section", "N/A")
subsection = getattr(m, "SubSection", "N/A")
title = getattr(m, "Title", "N/A")
if section not in sections:
sections[section] = {}
if subsection not in sections[section]:
sections[section][subsection] = []
sections[section][subsection].append({"id": req_id, "title": title})
section_num = 1
for section_name, subsections in sections.items():
elements.append(
Paragraph(f"{section_num}. {section_name}", self.styles["h2"])
)
for subsection_name, requirements in subsections.items():
elements.append(Paragraph(f"{subsection_name}", self.styles["h3"]))
for req in requirements:
elements.append(
Paragraph(
f"{req['id']} - {req['title']}", self.styles["normal"]
)
)
section_num += 1
elements.append(Spacer(1, 0.1 * inch))
return elements
def _create_section_score_chart(self, data: ComplianceData):
"""
Create the section compliance score chart using weighted ThreatScore formula.
The section score uses the same weighted formula as the overall ThreatScore:
Score = Σ(rate_i * total_findings_i * weight_i * rfac_i) / Σ(total_findings_i * weight_i * rfac_i)
Where rfac_i = 1 + 0.25 * risk_level
Sections without findings are shown with 100% score.
Args:
data: Aggregated compliance data.
Returns:
BytesIO buffer containing the chart image.
"""
# First, collect ALL sections from requirements (including those without findings)
all_sections = set()
sections_data = {}
for req in data.requirements:
m = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
if m:
section = getattr(m, "Section", "Other")
all_sections.add(section)
# Only calculate scores for requirements with findings
if req.total_findings == 0:
continue
risk_level_raw = getattr(m, "LevelOfRisk", 0)
weight_raw = getattr(m, "Weight", 0)
# Ensure numeric types for calculations (compliance data may have str)
try:
risk_level = int(risk_level_raw) if risk_level_raw else 0
except (ValueError, TypeError):
risk_level = 0
try:
weight = int(weight_raw) if weight_raw else 0
except (ValueError, TypeError):
weight = 0
# ThreatScore formula components
rate_i = req.passed_findings / req.total_findings
rfac_i = 1 + 0.25 * risk_level
if section not in sections_data:
sections_data[section] = {
"numerator": 0,
"denominator": 0,
}
sections_data[section]["numerator"] += (
rate_i * req.total_findings * weight * rfac_i
)
sections_data[section]["denominator"] += (
req.total_findings * weight * rfac_i
)
# Calculate percentages for all sections
labels = []
values = []
for section in sorted(all_sections):
if section in sections_data and sections_data[section]["denominator"] > 0:
pct = (
sections_data[section]["numerator"]
/ sections_data[section]["denominator"]
) * 100
else:
# Sections without findings get 100%
pct = 100.0
labels.append(section)
values.append(pct)
return create_vertical_bar_chart(
labels=labels,
values=values,
ylabel="Compliance Score (%)",
xlabel="",
color_func=get_chart_color_for_percentage,
rotation=0,
)
def _calculate_threatscore(self, data: ComplianceData) -> float:
"""
Calculate the overall ThreatScore using the weighted formula.
Args:
data: Aggregated compliance data.
Returns:
Overall ThreatScore percentage.
"""
numerator = 0
denominator = 0
has_findings = False
for req in data.requirements:
if req.total_findings == 0:
continue
has_findings = True
m = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
if m:
risk_level_raw = getattr(m, "LevelOfRisk", 0)
weight_raw = getattr(m, "Weight", 0)
# Ensure numeric types for calculations (compliance data may have str)
try:
risk_level = int(risk_level_raw) if risk_level_raw else 0
except (ValueError, TypeError):
risk_level = 0
try:
weight = int(weight_raw) if weight_raw else 0
except (ValueError, TypeError):
weight = 0
rate_i = req.passed_findings / req.total_findings
rfac_i = 1 + 0.25 * risk_level
numerator += rate_i * req.total_findings * weight * rfac_i
denominator += req.total_findings * weight * rfac_i
if not has_findings:
return 100.0
if denominator > 0:
return (numerator / denominator) * 100
return 0.0
def _get_critical_failed_requirements(
self, data: ComplianceData, min_risk_level: int
) -> list[dict]:
"""
Get critical failed requirements sorted by risk level and weight.
Args:
data: Aggregated compliance data.
min_risk_level: Minimum risk level threshold.
Returns:
List of critical failed requirement dictionaries.
"""
critical = []
for req in data.requirements:
if req.status != StatusChoices.FAIL:
continue
m = get_requirement_metadata(req.id, data.attributes_by_requirement_id)
if m:
risk_level_raw = getattr(m, "LevelOfRisk", 0)
weight_raw = getattr(m, "Weight", 0)
# Ensure numeric types for calculations (compliance data may have str)
try:
risk_level = int(risk_level_raw) if risk_level_raw else 0
except (ValueError, TypeError):
risk_level = 0
try:
weight = int(weight_raw) if weight_raw else 0
except (ValueError, TypeError):
weight = 0
if risk_level >= min_risk_level:
critical.append(
{
"id": req.id,
"risk_level": risk_level,
"weight": weight,
"title": getattr(m, "Title", "N/A"),
"section": getattr(m, "Section", "N/A"),
}
)
critical.sort(key=lambda x: (x["risk_level"], x["weight"]), reverse=True)
return critical
def _create_critical_requirements_table(self, critical_requirements: list) -> Table:
"""
Create the critical requirements table.
Args:
critical_requirements: List of critical requirement dictionaries.
Returns:
ReportLab Table element.
"""
table_data = [["Risk", "Weight", "Requirement ID", "Title", "Section"]]
for req in critical_requirements:
title = req["title"]
if len(title) > 50:
title = title[:47] + "..."
table_data.append(
[
str(req["risk_level"]),
str(req["weight"]),
req["id"],
title,
req["section"],
]
)
table = Table(
table_data,
colWidths=[0.7 * inch, 0.9 * inch, 1.3 * inch, 3.1 * inch, 1.5 * inch],
)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), COLOR_HIGH_RISK),
("TEXTCOLOR", (0, 0), (-1, 0), COLOR_WHITE),
("FONTNAME", (0, 0), (-1, 0), "FiraCode"),
("FONTSIZE", (0, 0), (-1, 0), 10),
("BACKGROUND", (0, 1), (0, -1), COLOR_HIGH_RISK),
("TEXTCOLOR", (0, 1), (0, -1), COLOR_WHITE),
("FONTNAME", (0, 1), (0, -1), "FiraCode"),
("ALIGN", (0, 1), (0, -1), "CENTER"),
("FONTSIZE", (0, 1), (0, -1), 12),
("ALIGN", (1, 1), (1, -1), "CENTER"),
("FONTNAME", (1, 1), (1, -1), "FiraCode"),
("FONTNAME", (2, 1), (2, -1), "FiraCode"),
("FONTSIZE", (2, 1), (2, -1), 9),
("FONTNAME", (3, 1), (-1, -1), "PlusJakartaSans"),
("FONTSIZE", (3, 1), (-1, -1), 8),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("GRID", (0, 0), (-1, -1), 1, colors.Color(0.7, 0.7, 0.7)),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 8),
("BOTTOMPADDING", (0, 0), (-1, -1), 8),
("BACKGROUND", (1, 1), (-1, -1), colors.Color(0.98, 0.98, 0.98)),
]
)
)
# Color weight column based on value
for idx, req in enumerate(critical_requirements):
row_idx = idx + 1
weight_color = get_color_for_weight(req["weight"])
table.setStyle(
TableStyle(
[
("BACKGROUND", (1, row_idx), (1, row_idx), weight_color),
("TEXTCOLOR", (1, row_idx), (1, row_idx), COLOR_WHITE),
]
)
)
return table

View File

@@ -131,9 +131,18 @@ def compute_threatscore_metrics(
continue
m = metadata[0]
risk_level = getattr(m, "LevelOfRisk", 0)
weight = getattr(m, "Weight", 0)
risk_level_raw = getattr(m, "LevelOfRisk", 0)
weight_raw = getattr(m, "Weight", 0)
section = getattr(m, "Section", "Unknown")
# Ensure numeric types for calculations (compliance data may have str)
try:
risk_level = int(risk_level_raw) if risk_level_raw else 0
except (ValueError, TypeError):
risk_level = 0
try:
weight = int(weight_raw) if weight_raw else 0
except (ValueError, TypeError):
weight = 0
# Calculate ThreatScore components using formula from UI
rate_i = req_passed_findings / req_total_findings

View File

@@ -1,9 +1,6 @@
from collections import defaultdict
from celery.utils.log import get_task_logger
from config.django.base import DJANGO_FINDINGS_BATCH_SIZE
from django.db.models import Count, Q
from tasks.utils import batched
from api.db_router import READ_REPLICA_ALIAS
from api.db_utils import rls_transaction
@@ -154,6 +151,12 @@ def _load_findings_for_requirement_checks(
Supports optional caching to avoid duplicate queries when generating multiple
reports for the same scan.
Memory optimizations:
- Uses database iterator with chunk_size for streaming large result sets
- Shares references between cache and return dict (no duplication)
- Only selects required fields from database
- Processes findings in batches to reduce memory pressure
Args:
tenant_id (str): The tenant ID for Row-Level Security context.
scan_id (str): The ID of the scan to retrieve findings for.
@@ -171,69 +174,73 @@ def _load_findings_for_requirement_checks(
'aws_s3_bucket_public_access': [FindingOutput(...)]
}
"""
findings_by_check_id = defaultdict(list)
if not check_ids:
return dict(findings_by_check_id)
return {}
# Initialize cache if not provided
if findings_cache is None:
findings_cache = {}
# Deduplicate check_ids to avoid redundant processing
unique_check_ids = list(set(check_ids))
# Separate cached and non-cached check_ids
check_ids_to_load = []
cache_hits = 0
cache_misses = 0
for check_id in check_ids:
for check_id in unique_check_ids:
if check_id in findings_cache:
# Reuse from cache
findings_by_check_id[check_id] = findings_cache[check_id]
cache_hits += 1
else:
# Need to load from database
check_ids_to_load.append(check_id)
cache_misses += 1
if cache_hits > 0:
total_checks = len(unique_check_ids)
logger.info(
f"Findings cache: {cache_hits} hits, {cache_misses} misses "
f"({cache_hits / (cache_hits + cache_misses) * 100:.1f}% hit rate)"
f"Findings cache: {cache_hits}/{total_checks} hits "
f"({cache_hits / total_checks * 100:.1f}% hit rate)"
)
# If all check_ids were in cache, return early
if not check_ids_to_load:
return dict(findings_by_check_id)
logger.info(f"Loading findings for {len(check_ids_to_load)} checks on-demand")
findings_queryset = (
Finding.all_objects.filter(
tenant_id=tenant_id, scan_id=scan_id, check_id__in=check_ids_to_load
# Load missing check_ids from database
if check_ids_to_load:
logger.info(
f"Loading findings for {len(check_ids_to_load)} checks from database"
)
.order_by("uid")
.iterator()
)
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
for batch, is_last_batch in batched(
findings_queryset, DJANGO_FINDINGS_BATCH_SIZE
):
for finding_model in batch:
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
# Use iterator with chunk_size for memory-efficient streaming
# chunk_size controls how many rows Django fetches from DB at once
findings_queryset = (
Finding.all_objects.filter(
tenant_id=tenant_id,
scan_id=scan_id,
check_id__in=check_ids_to_load,
)
.order_by("check_id", "uid")
.iterator(chunk_size=DJANGO_FINDINGS_BATCH_SIZE)
)
# Pre-initialize empty lists for all check_ids to load
# This avoids repeated dict lookups and 'if not in' checks
for check_id in check_ids_to_load:
findings_cache[check_id] = []
findings_count = 0
for finding_model in findings_queryset:
finding_output = FindingOutput.transform_api_finding(
finding_model, prowler_provider
)
findings_by_check_id[finding_output.check_id].append(finding_output)
# Update cache with newly loaded findings
if finding_output.check_id not in findings_cache:
findings_cache[finding_output.check_id] = []
findings_cache[finding_output.check_id].append(finding_output)
findings_count += 1
total_findings_loaded = sum(
len(findings) for findings in findings_by_check_id.values()
)
logger.info(
f"Loaded {total_findings_loaded} findings for {len(findings_by_check_id)} checks"
)
logger.info(
f"Loaded {findings_count} findings for {len(check_ids_to_load)} checks"
)
return dict(findings_by_check_id)
# Build result dict using cache references (no data duplication)
# This shares the same list objects between cache and result
result = {
check_id: findings_cache.get(check_id, []) for check_id in unique_check_ids
}
return result

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff