diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 6eb74e7f53..bb727840c7 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -7,6 +7,7 @@ All notable changes to the **Prowler API** are documented in this file. ### 🚀 Added - OpenStack provider support [(#10003)](https://github.com/prowler-cloud/prowler/pull/10003) +- PDF report for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088) ### 🔄 Changed diff --git a/api/src/backend/api/specs/v1.yaml b/api/src/backend/api/specs/v1.yaml index 88d67c01c1..cee52dcfa5 100644 --- a/api/src/backend/api/specs/v1.yaml +++ b/api/src/backend/api/specs/v1.yaml @@ -10747,6 +10747,72 @@ paths: description: CSV file containing the compliance report '404': description: Compliance report not found + /api/v1/scans/{id}/csa: + get: + operationId: scans_csa_retrieve + description: Download CSA Cloud Controls Matrix (CCM) v4.0 compliance report + as a PDF file. + summary: Retrieve CSA CCM compliance report + parameters: + - in: query + name: fields[scans] + schema: + type: array + items: + type: string + enum: + - name + - trigger + - state + - unique_resource_count + - progress + - duration + - provider + - task + - inserted_at + - started_at + - completed_at + - scheduled_at + - next_scan_at + - processor + - url + description: endpoint return only specific fields in the response on a per-type + basis by including a fields[TYPE] query parameter. + explode: false + - in: path + name: id + schema: + type: string + format: uuid + description: A UUID string identifying this scan. + required: true + - in: query + name: include + schema: + type: array + items: + type: string + enum: + - provider + description: include query parameter to allow the client to customize which + related resources should be returned. + explode: false + tags: + - Scan + security: + - JWT or API Key: [] + responses: + '200': + description: PDF file containing the CSA CCM compliance report + '202': + description: The task is in progress + '401': + description: API key missing or user not Authenticated + '403': + description: There is a problem with credentials + '404': + description: The scan has no CSA CCM reports, or the CSA CCM report generation + task has not started yet /api/v1/scans/{id}/ens: get: operationId: scans_ens_retrieve @@ -17468,13 +17534,13 @@ components: description: OpenStack region name (e.g., RegionOne). identity_api_version: type: string - description: Keystone API version (default: 3). + description: 'Keystone API version (default: 3).' user_domain_name: type: string - description: User domain name (default: Default). + description: 'User domain name (default: Default).' project_domain_name: type: string - description: Project domain name (default: Default). + description: 'Project domain name (default: Default).' required: - auth_url - username @@ -19546,13 +19612,13 @@ components: description: OpenStack region name (e.g., RegionOne). identity_api_version: type: string - description: Keystone API version (default: 3). + description: 'Keystone API version (default: 3).' user_domain_name: type: string - description: User domain name (default: Default). + description: 'User domain name (default: Default).' project_domain_name: type: string - description: Project domain name (default: Default). + description: 'Project domain name (default: Default).' required: - auth_url - username @@ -19976,13 +20042,13 @@ components: description: OpenStack region name (e.g., RegionOne). identity_api_version: type: string - description: Keystone API version (default: 3). + description: 'Keystone API version (default: 3).' user_domain_name: type: string - description: User domain name (default: Default). + description: 'User domain name (default: Default).' project_domain_name: type: string - description: Project domain name (default: Default). + description: 'Project domain name (default: Default).' required: - auth_url - username @@ -20416,13 +20482,13 @@ components: description: OpenStack region name (e.g., RegionOne). identity_api_version: type: string - description: Keystone API version (default: 3). + description: 'Keystone API version (default: 3).' user_domain_name: type: string - description: User domain name (default: Default). + description: 'User domain name (default: Default).' project_domain_name: type: string - description: Project domain name (default: Default). + description: 'Project domain name (default: Default).' required: - auth_url - username diff --git a/api/src/backend/api/v1/views.py b/api/src/backend/api/v1/views.py index bba1689f7d..044682ce56 100644 --- a/api/src/backend/api/v1/views.py +++ b/api/src/backend/api/v1/views.py @@ -1759,6 +1759,25 @@ class ProviderViewSet(DisablePaginationMixin, BaseRLSViewSet): ), }, ), + csa=extend_schema( + tags=["Scan"], + summary="Retrieve CSA CCM compliance report", + description="Download CSA Cloud Controls Matrix (CCM) v4.0 compliance report as a PDF file.", + request=None, + responses={ + 200: OpenApiResponse( + description="PDF file containing the CSA CCM compliance report" + ), + 202: OpenApiResponse(description="The task is in progress"), + 401: OpenApiResponse( + description="API key missing or user not Authenticated" + ), + 403: OpenApiResponse(description="There is a problem with credentials"), + 404: OpenApiResponse( + description="The scan has no CSA CCM reports, or the CSA CCM report generation task has not started yet" + ), + }, + ), ) @method_decorator(CACHE_DECORATOR, name="list") @method_decorator(CACHE_DECORATOR, name="retrieve") @@ -1824,6 +1843,9 @@ class ScanViewSet(BaseRLSViewSet): elif self.action == "nis2": if hasattr(self, "response_serializer_class"): return self.response_serializer_class + elif self.action == "csa": + if hasattr(self, "response_serializer_class"): + return self.response_serializer_class return super().get_serializer_class() def partial_update(self, request, *args, **kwargs): @@ -2185,6 +2207,45 @@ class ScanViewSet(BaseRLSViewSet): content, filename = loader return self._serve_file(content, filename, "application/pdf") + @action( + detail=True, + methods=["get"], + url_name="csa", + ) + def csa(self, request, pk=None): + scan = self.get_object() + running_resp = self._get_task_status(scan) + if running_resp: + return running_resp + + if not scan.output_location: + return Response( + { + "detail": "The scan has no reports, or the CSA CCM report generation task has not started yet." + }, + status=status.HTTP_404_NOT_FOUND, + ) + + if scan.output_location.startswith("s3://"): + bucket = env.str("DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET", "") + key_prefix = scan.output_location.removeprefix(f"s3://{bucket}/") + prefix = os.path.join( + os.path.dirname(key_prefix), + "csa", + "*_csa_report.pdf", + ) + loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True) + else: + base = os.path.dirname(scan.output_location) + pattern = os.path.join(base, "csa", "*_csa_report.pdf") + loader = self._load_file(pattern, s3=False) + + if isinstance(loader, Response): + return loader + + content, filename = loader + return self._serve_file(content, filename, "application/pdf") + def create(self, request, *args, **kwargs): input_serializer = self.get_serializer(data=request.data) input_serializer.is_valid(raise_exception=True) diff --git a/api/src/backend/tasks/jobs/report.py b/api/src/backend/tasks/jobs/report.py index de022e9afd..a41a8d6292 100644 --- a/api/src/backend/tasks/jobs/report.py +++ b/api/src/backend/tasks/jobs/report.py @@ -6,6 +6,7 @@ from config.django.base import DJANGO_TMP_OUTPUT_DIRECTORY from tasks.jobs.export import _generate_compliance_output_directory, _upload_to_s3 from tasks.jobs.reports import ( FRAMEWORK_REGISTRY, + CSAReportGenerator, ENSReportGenerator, NIS2ReportGenerator, ThreatScoreReportGenerator, @@ -147,6 +148,49 @@ def generate_nis2_report( ) +def generate_csa_report( + tenant_id: str, + scan_id: str, + compliance_id: str, + output_path: str, + provider_id: str, + only_failed: bool = True, + include_manual: bool = False, + provider_obj: Provider | None = None, + requirement_statistics: dict[str, dict[str, int]] | None = None, + findings_cache: dict[str, list[FindingOutput]] | None = None, +) -> None: + """ + Generate a PDF compliance report for CSA Cloud Controls Matrix (CCM) v4.0. + + Args: + tenant_id: The tenant ID for Row-Level Security context. + scan_id: ID of the scan executed by Prowler. + compliance_id: ID of the compliance framework (e.g., "csa_ccm_4.0_aws"). + output_path: Output PDF file path. + provider_id: Provider ID for the scan. + only_failed: If True, only include failed requirements in detailed section. + include_manual: If True, include manual requirements in detailed section. + provider_obj: Pre-fetched Provider object to avoid duplicate queries. + requirement_statistics: Pre-aggregated requirement statistics. + findings_cache: Cache of already loaded findings to avoid duplicate queries. + """ + generator = CSAReportGenerator(FRAMEWORK_REGISTRY["csa_ccm"]) + + generator.generate( + tenant_id=tenant_id, + scan_id=scan_id, + compliance_id=compliance_id, + output_path=output_path, + provider_id=provider_id, + provider_obj=provider_obj, + requirement_statistics=requirement_statistics, + findings_cache=findings_cache, + only_failed=only_failed, + include_manual=include_manual, + ) + + def generate_compliance_reports( tenant_id: str, scan_id: str, @@ -154,11 +198,14 @@ def generate_compliance_reports( generate_threatscore: bool = True, generate_ens: bool = True, generate_nis2: bool = True, + generate_csa: bool = True, only_failed_threatscore: bool = True, min_risk_level_threatscore: int = 4, include_manual_ens: bool = True, include_manual_nis2: bool = False, only_failed_nis2: bool = True, + only_failed_csa: bool = True, + include_manual_csa: bool = False, ) -> dict[str, dict[str, bool | str]]: """ Generate multiple compliance reports with shared database queries. @@ -175,23 +222,27 @@ def generate_compliance_reports( generate_threatscore: Whether to generate ThreatScore report. generate_ens: Whether to generate ENS report. generate_nis2: Whether to generate NIS2 report. + generate_csa: Whether to generate CSA CCM report. only_failed_threatscore: For ThreatScore, only include failed requirements. min_risk_level_threatscore: Minimum risk level for ThreatScore critical requirements. include_manual_ens: For ENS, include manual requirements. include_manual_nis2: For NIS2, include manual requirements. only_failed_nis2: For NIS2, only include failed requirements. + only_failed_csa: For CSA CCM, only include failed requirements. + include_manual_csa: For CSA CCM, include manual requirements. Returns: Dictionary with results for each report type. """ logger.info( "Generating compliance reports for scan %s with provider %s" - " (ThreatScore: %s, ENS: %s, NIS2: %s)", + " (ThreatScore: %s, ENS: %s, NIS2: %s, CSA: %s)", scan_id, provider_id, generate_threatscore, generate_ens, generate_nis2, + generate_csa, ) results = {} @@ -206,6 +257,8 @@ def generate_compliance_reports( results["ens"] = {"upload": False, "path": ""} if generate_nis2: results["nis2"] = {"upload": False, "path": ""} + if generate_csa: + results["csa"] = {"upload": False, "path": ""} return results provider_obj = Provider.objects.get(id=provider_id) @@ -235,7 +288,23 @@ def generate_compliance_reports( results["nis2"] = {"upload": False, "path": ""} generate_nis2 = False - if not generate_threatscore and not generate_ens and not generate_nis2: + if generate_csa and provider_type not in [ + "aws", + "azure", + "gcp", + "oraclecloud", + "alibabacloud", + ]: + logger.info("Provider %s not supported for CSA CCM report", provider_type) + results["csa"] = {"upload": False, "path": ""} + generate_csa = False + + if ( + not generate_threatscore + and not generate_ens + and not generate_nis2 + and not generate_csa + ): return results # Aggregate requirement statistics once @@ -274,6 +343,13 @@ def generate_compliance_reports( scan_id, compliance_framework="nis2", ) + csa_path = _generate_compliance_output_directory( + DJANGO_TMP_OUTPUT_DIRECTORY, + provider_uid, + tenant_id, + scan_id, + compliance_framework="csa", + ) out_dir = str(Path(threatscore_path).parent.parent) except Exception as e: logger.error("Error generating output directory: %s", e) @@ -284,6 +360,8 @@ def generate_compliance_reports( results["ens"] = error_dict.copy() if generate_nis2: results["nis2"] = error_dict.copy() + if generate_csa: + results["csa"] = error_dict.copy() return results # Generate ThreatScore report @@ -456,6 +534,41 @@ def generate_compliance_reports( logger.error("Error generating NIS2 report: %s", e) results["nis2"] = {"upload": False, "path": "", "error": str(e)} + # Generate CSA CCM report + if generate_csa: + compliance_id_csa = f"csa_ccm_4.0_{provider_type}" + pdf_path_csa = f"{csa_path}_csa_report.pdf" + logger.info("Generating CSA CCM report with compliance %s", compliance_id_csa) + + try: + generate_csa_report( + tenant_id=tenant_id, + scan_id=scan_id, + compliance_id=compliance_id_csa, + output_path=pdf_path_csa, + provider_id=provider_id, + only_failed=only_failed_csa, + include_manual=include_manual_csa, + provider_obj=provider_obj, + requirement_statistics=requirement_statistics, + findings_cache=findings_cache, + ) + + upload_uri_csa = _upload_to_s3( + tenant_id, scan_id, pdf_path_csa, f"csa/{Path(pdf_path_csa).name}" + ) + + if upload_uri_csa: + results["csa"] = {"upload": True, "path": upload_uri_csa} + logger.info("CSA CCM report uploaded to %s", upload_uri_csa) + else: + results["csa"] = {"upload": False, "path": out_dir} + logger.warning("CSA CCM report saved locally at %s", out_dir) + + except Exception as e: + logger.error("Error generating CSA CCM report: %s", e) + results["csa"] = {"upload": False, "path": "", "error": str(e)} + # Clean up temporary files if all reports were uploaded successfully all_uploaded = all( result.get("upload", False) @@ -481,6 +594,7 @@ def generate_compliance_reports_job( generate_threatscore: bool = True, generate_ens: bool = True, generate_nis2: bool = True, + generate_csa: bool = True, ) -> dict[str, dict[str, bool | str]]: """ Celery task wrapper for generate_compliance_reports. @@ -492,6 +606,7 @@ def generate_compliance_reports_job( generate_threatscore: Whether to generate ThreatScore report. generate_ens: Whether to generate ENS report. generate_nis2: Whether to generate NIS2 report. + generate_csa: Whether to generate CSA CCM report. Returns: Dictionary with results for each report type. @@ -503,4 +618,5 @@ def generate_compliance_reports_job( generate_threatscore=generate_threatscore, generate_ens=generate_ens, generate_nis2=generate_nis2, + generate_csa=generate_csa, ) diff --git a/api/src/backend/tasks/jobs/reports/__init__.py b/api/src/backend/tasks/jobs/reports/__init__.py index 60602b93ab..1fc475a467 100644 --- a/api/src/backend/tasks/jobs/reports/__init__.py +++ b/api/src/backend/tasks/jobs/reports/__init__.py @@ -71,6 +71,8 @@ from .config import ( COLOR_PROWLER_DARK_GREEN, COLOR_SAFE, COLOR_WHITE, + CSA_CCM_SECTION_SHORT_NAMES, + CSA_CCM_SECTIONS, DIMENSION_KEYS, DIMENSION_MAPPING, DIMENSION_NAMES, @@ -90,6 +92,7 @@ from .config import ( ) # Framework-specific generators +from .csa import CSAReportGenerator from .ens import ENSReportGenerator from .nis2 import NIS2ReportGenerator from .threatscore import ThreatScoreReportGenerator @@ -105,6 +108,7 @@ __all__ = [ "ThreatScoreReportGenerator", "ENSReportGenerator", "NIS2ReportGenerator", + "CSAReportGenerator", # Configuration "FrameworkConfig", "FRAMEWORK_REGISTRY", @@ -147,6 +151,8 @@ __all__ = [ "THREATSCORE_SECTIONS", "NIS2_SECTIONS", "NIS2_SECTION_TITLES", + "CSA_CCM_SECTIONS", + "CSA_CCM_SECTION_SHORT_NAMES", # Layout constants "COL_WIDTH_SMALL", "COL_WIDTH_MEDIUM", diff --git a/api/src/backend/tasks/jobs/reports/base.py b/api/src/backend/tasks/jobs/reports/base.py index 42776681fb..f348c6d0d2 100644 --- a/api/src/backend/tasks/jobs/reports/base.py +++ b/api/src/backend/tasks/jobs/reports/base.py @@ -662,6 +662,9 @@ class BaseComplianceReportGenerator(ABC): elements.append(create_status_badge(req.status)) elements.append(Spacer(1, 0.1 * inch)) + # Hook for subclasses to add extra detail (e.g., CSA attributes) + elements.extend(self._render_requirement_detail_extras(req, data)) + # Findings for this requirement for check_id in req.checks: elements.append(Paragraph(f"Check: {check_id}", self.styles["h2"])) @@ -701,6 +704,24 @@ class BaseComplianceReportGenerator(ABC): return page_text, "Powered by Prowler" + def _render_requirement_detail_extras( + self, req: RequirementData, data: ComplianceData + ) -> list: + """Hook for subclasses to render extra content in detailed findings. + + Called after the status badge for each requirement in the detailed + findings section. Override in subclasses to add framework-specific + metadata (e.g., CSA CCM attributes). + + Args: + req: The requirement being rendered. + data: Aggregated compliance data. + + Returns: + List of ReportLab elements (empty by default). + """ + return [] + # ========================================================================= # Private Helper Methods # ========================================================================= diff --git a/api/src/backend/tasks/jobs/reports/config.py b/api/src/backend/tasks/jobs/reports/config.py index 0785505820..fe0326980d 100644 --- a/api/src/backend/tasks/jobs/reports/config.py +++ b/api/src/backend/tasks/jobs/reports/config.py @@ -143,6 +143,36 @@ NIS2_SECTION_TITLES = { "12": "12. Asset Management", } +# CSA CCM sections (Cloud Controls Matrix v4.0 domains) +CSA_CCM_SECTIONS = [ + "Application & Interface Security", + "Audit & Assurance", + "Business Continuity Management and Operational Resilience", + "Change Control and Configuration Management", + "Cryptography, Encryption & Key Management", + "Data Security and Privacy Lifecycle Management", + "Datacenter Security", + "Governance, Risk and Compliance", + "Identity & Access Management", + "Infrastructure & Virtualization Security", + "Interoperability & Portability", + "Logging and Monitoring", + "Security Incident Management, E-Discovery, & Cloud Forensics", + "Threat & Vulnerability Management", + "Universal Endpoint Management", +] + +# Short names for CSA CCM sections (used in chart labels) +CSA_CCM_SECTION_SHORT_NAMES = { + "Application & Interface Security": "App & Interface Security", + "Business Continuity Management and Operational Resilience": "Business Continuity", + "Change Control and Configuration Management": "Change Control & Config", + "Cryptography, Encryption & Key Management": "Cryptography & Encryption", + "Data Security and Privacy Lifecycle Management": "Data Security & Privacy", + "Security Incident Management, E-Discovery, & Cloud Forensics": "Incident Mgmt & Forensics", + "Infrastructure & Virtualization Security": "Infrastructure & Virtualization", +} + # Table column widths COL_WIDTH_SMALL = 0.4 * inch COL_WIDTH_MEDIUM = 0.9 * inch @@ -261,6 +291,28 @@ FRAMEWORK_REGISTRY: dict[str, FrameworkConfig] = { has_niveles=False, has_weight=False, ), + "csa_ccm": FrameworkConfig( + name="csa_ccm", + display_name="CSA Cloud Controls Matrix (CCM)", + logo_filename=None, + primary_color=COLOR_BLUE, + secondary_color=COLOR_LIGHT_BLUE, + bg_color=COLOR_BG_BLUE, + attribute_fields=[ + "Section", + "CCMLite", + "IaaS", + "PaaS", + "SaaS", + "ScopeApplicability", + ], + sections=CSA_CCM_SECTIONS, + language="en", + has_risk_levels=False, + has_dimensions=False, + has_niveles=False, + has_weight=False, + ), } @@ -282,5 +334,7 @@ def get_framework_config(compliance_id: str) -> FrameworkConfig | None: return FRAMEWORK_REGISTRY["ens"] if "nis2" in compliance_lower: return FRAMEWORK_REGISTRY["nis2"] + if "csa" in compliance_lower or "ccm" in compliance_lower: + return FRAMEWORK_REGISTRY["csa_ccm"] return None diff --git a/api/src/backend/tasks/jobs/reports/csa.py b/api/src/backend/tasks/jobs/reports/csa.py new file mode 100644 index 0000000000..c55ed198de --- /dev/null +++ b/api/src/backend/tasks/jobs/reports/csa.py @@ -0,0 +1,474 @@ +from collections import defaultdict + +from celery.utils.log import get_task_logger +from reportlab.lib.units import inch +from reportlab.platypus import Image, PageBreak, Paragraph, Spacer, Table, TableStyle + +from api.models import StatusChoices + +from .base import ( + BaseComplianceReportGenerator, + ComplianceData, + get_requirement_metadata, +) +from .charts import create_horizontal_bar_chart, get_chart_color_for_percentage +from .config import ( + COLOR_BG_BLUE, + COLOR_BLUE, + COLOR_BORDER_GRAY, + COLOR_DARK_GRAY, + COLOR_GRID_GRAY, + COLOR_HIGH_RISK, + COLOR_SAFE, + COLOR_WHITE, + CSA_CCM_SECTION_SHORT_NAMES, + CSA_CCM_SECTIONS, +) + +logger = get_task_logger(__name__) + + +class CSAReportGenerator(BaseComplianceReportGenerator): + """ + PDF report generator for CSA Cloud Controls Matrix (CCM) v4.0. + + This generator creates comprehensive PDF reports containing: + - Cover page with Prowler logo + - Executive summary with overall compliance score + - Section analysis with horizontal bar chart + - Section breakdown table + - Requirements index organized by section + - Detailed findings for failed requirements + """ + + def create_executive_summary(self, data: ComplianceData) -> list: + """ + Create the executive summary with compliance metrics. + + Args: + data: Aggregated compliance data. + + Returns: + List of ReportLab elements. + """ + elements = [] + + elements.append(Paragraph("Executive Summary", self.styles["h1"])) + elements.append(Spacer(1, 0.1 * inch)) + + # Calculate statistics + total = len(data.requirements) + passed = sum(1 for r in data.requirements if r.status == StatusChoices.PASS) + failed = sum(1 for r in data.requirements if r.status == StatusChoices.FAIL) + manual = sum(1 for r in data.requirements if r.status == StatusChoices.MANUAL) + + logger.info( + "CSA CCM Executive Summary: total=%d, passed=%d, failed=%d, manual=%d", + total, + passed, + failed, + manual, + ) + + # Log sample of requirements for debugging + for req in data.requirements[:5]: + logger.info( + " Requirement %s: status=%s, passed_findings=%d, total_findings=%d", + req.id, + req.status, + req.passed_findings, + req.total_findings, + ) + + # Calculate compliance excluding manual + evaluated = passed + failed + overall_compliance = (passed / evaluated * 100) if evaluated > 0 else 100 + + # Summary statistics table + summary_data = [ + ["Metric", "Value"], + ["Total Requirements", str(total)], + ["Passed \u2713", str(passed)], + ["Failed \u2717", str(failed)], + ["Manual \u2299", str(manual)], + ["Overall Compliance", f"{overall_compliance:.1f}%"], + ] + + summary_table = Table(summary_data, colWidths=[3 * inch, 2 * inch]) + summary_table.setStyle( + TableStyle( + [ + ("BACKGROUND", (0, 0), (-1, 0), COLOR_BLUE), + ("TEXTCOLOR", (0, 0), (-1, 0), COLOR_WHITE), + ("BACKGROUND", (0, 2), (0, 2), COLOR_SAFE), + ("TEXTCOLOR", (0, 2), (0, 2), COLOR_WHITE), + ("BACKGROUND", (0, 3), (0, 3), COLOR_HIGH_RISK), + ("TEXTCOLOR", (0, 3), (0, 3), COLOR_WHITE), + ("BACKGROUND", (0, 4), (0, 4), COLOR_DARK_GRAY), + ("TEXTCOLOR", (0, 4), (0, 4), COLOR_WHITE), + ("ALIGN", (0, 0), (-1, -1), "CENTER"), + ("FONTNAME", (0, 0), (-1, 0), "PlusJakartaSans"), + ("FONTSIZE", (0, 0), (-1, 0), 12), + ("FONTSIZE", (0, 1), (-1, -1), 10), + ("BOTTOMPADDING", (0, 0), (-1, 0), 10), + ("GRID", (0, 0), (-1, -1), 0.5, COLOR_BORDER_GRAY), + ( + "ROWBACKGROUNDS", + (1, 1), + (1, -1), + [COLOR_WHITE, COLOR_BG_BLUE], + ), + ] + ) + ) + elements.append(summary_table) + + return elements + + def create_charts_section(self, data: ComplianceData) -> list: + """ + Create the charts section with section analysis. + + Args: + data: Aggregated compliance data. + + Returns: + List of ReportLab elements. + """ + elements = [] + + # Section chart + elements.append(Paragraph("Compliance by Section", self.styles["h1"])) + elements.append(Spacer(1, 0.1 * inch)) + elements.append( + Paragraph( + "The following chart shows compliance percentage for each domain " + "of the CSA Cloud Controls Matrix:", + self.styles["normal_center"], + ) + ) + elements.append(Spacer(1, 0.1 * inch)) + + chart_buffer = self._create_section_chart(data) + chart_buffer.seek(0) + chart_image = Image(chart_buffer, width=6.5 * inch, height=5 * inch) + elements.append(chart_image) + elements.append(PageBreak()) + + # Section breakdown table + elements.append(Paragraph("Section Breakdown", self.styles["h1"])) + elements.append(Spacer(1, 0.1 * inch)) + + section_table = self._create_section_table(data) + elements.append(section_table) + + return elements + + def create_requirements_index(self, data: ComplianceData) -> list: + """ + Create the requirements index organized by section. + + Args: + data: Aggregated compliance data. + + Returns: + List of ReportLab elements. + """ + elements = [] + + elements.append(Paragraph("Requirements Index", self.styles["h1"])) + elements.append(Spacer(1, 0.1 * inch)) + + # Organize by section + sections = {} + for req in data.requirements: + m = get_requirement_metadata(req.id, data.attributes_by_requirement_id) + if m: + section = getattr(m, "Section", "Other") + + if section not in sections: + sections[section] = [] + + sections[section].append( + { + "id": req.id, + "description": req.description, + "status": req.status, + } + ) + + # Sort by CSA CCM section order + for section in CSA_CCM_SECTIONS: + if section not in sections: + continue + + elements.append(Paragraph(section, self.styles["h2"])) + + for req in sections[section]: + status_indicator = ( + "\u2713" if req["status"] == StatusChoices.PASS else "\u2717" + ) + if req["status"] == StatusChoices.MANUAL: + status_indicator = "\u2299" + + desc = ( + req["description"][:80] + "..." + if len(req["description"]) > 80 + else req["description"] + ) + elements.append( + Paragraph( + f"{status_indicator} {req['id']}: {desc}", + self.styles["normal"], + ) + ) + + elements.append(Spacer(1, 0.1 * inch)) + + return elements + + def _render_requirement_detail_extras(self, req, data: ComplianceData) -> list: + """ + Render CSA CCM attributes in the detailed findings view. + + Shows CCMLite flag, IaaS/PaaS/SaaS applicability, and + cross-framework references after the status badge for each requirement. + + Args: + req: The requirement being rendered. + data: Aggregated compliance data. + + Returns: + List of ReportLab elements. + """ + m = get_requirement_metadata(req.id, data.attributes_by_requirement_id) + if not m: + return [] + return self._format_requirement_attributes(m) + + def _format_requirement_attributes(self, m) -> list: + """ + Format CSA CCM requirement attributes as compact PDF elements. + + Displays CCMLite flag, IaaS/PaaS/SaaS applicability, and + cross-framework references from ScopeApplicability. + + Args: + m: Requirement metadata (CSA_CCM_Requirement_Attribute). + + Returns: + List of ReportLab elements. + """ + elements = [] + + # Applicability line: CCMLite | IaaS | PaaS | SaaS + ccm_lite = getattr(m, "CCMLite", "") + iaas = getattr(m, "IaaS", "") + paas = getattr(m, "PaaS", "") + saas = getattr(m, "SaaS", "") + + applicability_parts = [] + if ccm_lite: + applicability_parts.append(f"CCMLite: {ccm_lite}") + if iaas: + applicability_parts.append(f"IaaS: {iaas}") + if paas: + applicability_parts.append(f"PaaS: {paas}") + if saas: + applicability_parts.append(f"SaaS: {saas}") + + if applicability_parts: + elements.append( + Paragraph( + f"" + f"{'  |  '.join(applicability_parts)}" + f"", + self._attr_style(), + ) + ) + + # ScopeApplicability references (compact) + scope_list = getattr(m, "ScopeApplicability", []) + if scope_list: + refs = [] + for scope in scope_list: + ref_id = scope.get("ReferenceId", "") if isinstance(scope, dict) else "" + identifiers = ( + scope.get("Identifiers", []) if isinstance(scope, dict) else [] + ) + if ref_id and identifiers: + ids_str = ", ".join(str(i) for i in identifiers[:4]) + if len(identifiers) > 4: + ids_str += "..." + refs.append(f"{ref_id}: {ids_str}") + + if refs: + refs_text = "  |  ".join(refs) + elements.append( + Paragraph( + f"{refs_text}", + self._attr_style(), + ) + ) + + return elements + + def _attr_style(self): + """ + Return a compact style for attribute text lines. + + Returns: + ParagraphStyle for attribute display. + """ + from reportlab.lib.styles import ParagraphStyle + + return ParagraphStyle( + "AttrLine", + parent=self.styles["normal"], + fontSize=10, + spaceBefore=2, + spaceAfter=2, + leftIndent=30, + leading=13, + ) + + def _create_section_chart(self, data: ComplianceData): + """ + Create the section compliance chart. + + Args: + data: Aggregated compliance data. + + Returns: + BytesIO buffer containing the chart image. + """ + section_scores = defaultdict(lambda: {"passed": 0, "total": 0}) + + no_metadata_count = 0 + for req in data.requirements: + if req.status == StatusChoices.MANUAL: + continue + + m = get_requirement_metadata(req.id, data.attributes_by_requirement_id) + if m: + section = getattr(m, "Section", "Other") + section_scores[section]["total"] += 1 + if req.status == StatusChoices.PASS: + section_scores[section]["passed"] += 1 + else: + no_metadata_count += 1 + + if no_metadata_count > 0: + logger.warning( + "CSA CCM chart: %d requirements had no metadata", no_metadata_count + ) + + logger.info("CSA CCM section scores:") + for section in CSA_CCM_SECTIONS: + if section in section_scores: + scores = section_scores[section] + pct = ( + (scores["passed"] / scores["total"] * 100) + if scores["total"] > 0 + else 0 + ) + logger.info( + " %s: %d/%d (%.1f%%)", + section, + scores["passed"], + scores["total"], + pct, + ) + + # Build labels and values in CSA CCM section order + labels = [] + values = [] + for section in CSA_CCM_SECTIONS: + if section in section_scores and section_scores[section]["total"] > 0: + scores = section_scores[section] + pct = (scores["passed"] / scores["total"]) * 100 + # Use short name if available + label = CSA_CCM_SECTION_SHORT_NAMES.get(section, section) + labels.append(label) + values.append(pct) + + return create_horizontal_bar_chart( + labels=labels, + values=values, + xlabel="Compliance (%)", + color_func=get_chart_color_for_percentage, + ) + + def _create_section_table(self, data: ComplianceData) -> Table: + """ + Create the section breakdown table. + + Args: + data: Aggregated compliance data. + + Returns: + ReportLab Table element. + """ + section_scores = defaultdict(lambda: {"passed": 0, "failed": 0, "manual": 0}) + + for req in data.requirements: + m = get_requirement_metadata(req.id, data.attributes_by_requirement_id) + if m: + section = getattr(m, "Section", "Other") + + if req.status == StatusChoices.PASS: + section_scores[section]["passed"] += 1 + elif req.status == StatusChoices.FAIL: + section_scores[section]["failed"] += 1 + else: + section_scores[section]["manual"] += 1 + + table_data = [["Section", "Passed", "Failed", "Manual", "Compliance"]] + for section in CSA_CCM_SECTIONS: + if section not in section_scores: + continue + scores = section_scores[section] + total = scores["passed"] + scores["failed"] + pct = (scores["passed"] / total * 100) if total > 0 else 100 + # Use short name if available + label = CSA_CCM_SECTION_SHORT_NAMES.get(section, section) + table_data.append( + [ + label, + str(scores["passed"]), + str(scores["failed"]), + str(scores["manual"]), + f"{pct:.1f}%", + ] + ) + + table = Table( + table_data, + colWidths=[2.4 * inch, 0.9 * inch, 0.9 * inch, 0.9 * inch, 1.2 * inch], + ) + table.setStyle( + TableStyle( + [ + ("BACKGROUND", (0, 0), (-1, 0), COLOR_BLUE), + ("TEXTCOLOR", (0, 0), (-1, 0), COLOR_WHITE), + ("FONTNAME", (0, 0), (-1, 0), "FiraCode"), + ("FONTSIZE", (0, 0), (-1, 0), 10), + ("ALIGN", (0, 0), (-1, -1), "CENTER"), + ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), + ("FONTSIZE", (0, 1), (-1, -1), 9), + ("GRID", (0, 0), (-1, -1), 0.5, COLOR_GRID_GRAY), + ("LEFTPADDING", (0, 0), (-1, -1), 6), + ("RIGHTPADDING", (0, 0), (-1, -1), 6), + ("TOPPADDING", (0, 0), (-1, -1), 4), + ("BOTTOMPADDING", (0, 0), (-1, -1), 4), + ( + "ROWBACKGROUNDS", + (0, 1), + (-1, -1), + [COLOR_WHITE, COLOR_BG_BLUE], + ), + ] + ) + ) + + return table diff --git a/api/src/backend/tasks/jobs/threatscore_utils.py b/api/src/backend/tasks/jobs/threatscore_utils.py index c46c279bf4..7d0f2b6ec3 100644 --- a/api/src/backend/tasks/jobs/threatscore_utils.py +++ b/api/src/backend/tasks/jobs/threatscore_utils.py @@ -114,6 +114,11 @@ def _calculate_requirements_data_from_statistics( requirement_status = StatusChoices.PASS else: requirement_status = StatusChoices.FAIL + elif requirement_checks: + # Requirement has checks but none produced findings — consistent + # with the dashboard's scan processing which treats this as PASS + # (no failed checks means the requirement is considered compliant). + requirement_status = StatusChoices.PASS else: requirement_status = StatusChoices.MANUAL diff --git a/api/src/backend/tasks/tasks.py b/api/src/backend/tasks/tasks.py index cbe44ab304..0cb41d066a 100644 --- a/api/src/backend/tasks/tasks.py +++ b/api/src/backend/tasks/tasks.py @@ -10,9 +10,9 @@ from config.django.base import DJANGO_FINDINGS_BATCH_SIZE, DJANGO_TMP_OUTPUT_DIR from django_celery_beat.models import PeriodicTask from tasks.jobs.attack_paths import ( attack_paths_scan, - db_utils as attack_paths_db_utils, can_provider_run_attack_paths_scan, ) +from tasks.jobs.attack_paths import db_utils as attack_paths_db_utils from tasks.jobs.backfill import ( backfill_compliance_summaries, backfill_daily_severity_summaries, @@ -906,11 +906,11 @@ def jira_integration_task( @handle_provider_deletion def generate_compliance_reports_task(tenant_id: str, scan_id: str, provider_id: str): """ - Optimized task to generate ThreatScore, ENS, and NIS2 reports with shared queries. + Optimized task to generate ThreatScore, ENS, NIS2, and CSA CCM reports with shared queries. This task is more efficient than running separate report tasks because it reuses database queries: - - Provider object fetched once (instead of three times) - - Requirement statistics aggregated once (instead of three times) + - Provider object fetched once (instead of multiple times) + - Requirement statistics aggregated once (instead of multiple times) - Can reduce database load by up to 50-70% Args: @@ -928,6 +928,7 @@ def generate_compliance_reports_task(tenant_id: str, scan_id: str, provider_id: generate_threatscore=True, generate_ens=True, generate_nis2=True, + generate_csa=True, ) diff --git a/api/src/backend/tasks/tests/test_reports_csa.py b/api/src/backend/tasks/tests/test_reports_csa.py new file mode 100644 index 0000000000..602b9bb28e --- /dev/null +++ b/api/src/backend/tasks/tests/test_reports_csa.py @@ -0,0 +1,1085 @@ +import io +from unittest.mock import Mock + +import pytest +from reportlab.platypus import PageBreak, Paragraph, Table +from tasks.jobs.reports import FRAMEWORK_REGISTRY, ComplianceData, RequirementData +from tasks.jobs.reports.csa import CSAReportGenerator + + +# Use string status values directly to avoid Django DB initialization +# These match api.models.StatusChoices values +class StatusChoices: + """Mock StatusChoices to avoid Django DB initialization.""" + + PASS = "PASS" + FAIL = "FAIL" + MANUAL = "MANUAL" + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def csa_generator(): + """Create a CSAReportGenerator instance for testing.""" + config = FRAMEWORK_REGISTRY["csa_ccm"] + return CSAReportGenerator(config) + + +@pytest.fixture +def mock_csa_requirement_attribute_iam(): + """Create a mock CSA CCM requirement attribute for Identity & Access Management.""" + mock = Mock() + mock.Section = "Identity & Access Management" + mock.CCMLite = "Yes" + mock.IaaS = "Yes" + mock.PaaS = "Yes" + mock.SaaS = "Yes" + mock.ScopeApplicability = [ + {"ReferenceId": "ISO 27001", "Identifiers": ["A.9.1.1", "A.9.2.3"]}, + {"ReferenceId": "NIST 800-53", "Identifiers": ["AC-2", "AC-3", "AC-6"]}, + ] + return mock + + +@pytest.fixture +def mock_csa_requirement_attribute_logging(): + """Create a mock CSA CCM requirement attribute for Logging and Monitoring.""" + mock = Mock() + mock.Section = "Logging and Monitoring" + mock.CCMLite = "Yes" + mock.IaaS = "Yes" + mock.PaaS = "No" + mock.SaaS = "No" + mock.ScopeApplicability = [ + {"ReferenceId": "ISO 27001", "Identifiers": ["A.12.4.1"]}, + ] + return mock + + +@pytest.fixture +def mock_csa_requirement_attribute_crypto(): + """Create a mock CSA CCM requirement attribute for Cryptography.""" + mock = Mock() + mock.Section = "Cryptography, Encryption & Key Management" + mock.CCMLite = "No" + mock.IaaS = "Yes" + mock.PaaS = "Yes" + mock.SaaS = "No" + mock.ScopeApplicability = [] + return mock + + +@pytest.fixture +def basic_csa_compliance_data(): + """Create basic ComplianceData for CSA CCM testing.""" + return ComplianceData( + tenant_id="tenant-123", + scan_id="scan-456", + provider_id="provider-789", + compliance_id="csa_ccm_4.0_aws", + framework="CSA-CCM", + name="CSA Cloud Controls Matrix v4.0", + version="4.0", + description="Cloud Security Alliance Cloud Controls Matrix", + ) + + +# ============================================================================= +# Generator Initialization Tests +# ============================================================================= + + +class TestCSAGeneratorInitialization: + """Test suite for CSA generator initialization.""" + + def test_generator_creation(self, csa_generator): + """Test that CSA generator is created correctly.""" + assert csa_generator is not None + assert csa_generator.config.name == "csa_ccm" + assert csa_generator.config.language == "en" + + def test_generator_no_niveles(self, csa_generator): + """Test that CSA config does not use niveles.""" + assert csa_generator.config.has_niveles is False + + def test_generator_no_dimensions(self, csa_generator): + """Test that CSA config does not use dimensions.""" + assert csa_generator.config.has_dimensions is False + + def test_generator_no_risk_levels(self, csa_generator): + """Test that CSA config does not use risk levels.""" + assert csa_generator.config.has_risk_levels is False + + def test_generator_no_weight(self, csa_generator): + """Test that CSA config does not use weight.""" + assert csa_generator.config.has_weight is False + + +# ============================================================================= +# Cover Page Tests +# ============================================================================= + + +class TestCSACoverPage: + """Test suite for CSA cover page generation.""" + + def test_cover_page_has_logo(self, csa_generator, basic_csa_compliance_data): + """Test that cover page contains the Prowler logo.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_cover_page(basic_csa_compliance_data) + + assert len(elements) > 0 + + def test_cover_page_has_title(self, csa_generator, basic_csa_compliance_data): + """Test that cover page contains the CSA CCM title.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_cover_page(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "CSA" in content or "CCM" in content or "Cloud Controls" in content + + def test_cover_page_has_metadata_table( + self, csa_generator, basic_csa_compliance_data + ): + """Test that cover page contains metadata table.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_cover_page(basic_csa_compliance_data) + + tables = [e for e in elements if isinstance(e, Table)] + assert len(tables) >= 1 + + +# ============================================================================= +# Executive Summary Tests +# ============================================================================= + + +class TestCSAExecutiveSummary: + """Test suite for CSA executive summary generation.""" + + def test_executive_summary_has_english_title( + self, csa_generator, basic_csa_compliance_data + ): + """Test that executive summary has English title.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "Executive Summary" in content + + def test_executive_summary_calculates_compliance( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that executive summary calculates compliance percentage.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Passed requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Failed requirement", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=10, + total_findings=10, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + + # Should contain tables with metrics + tables = [e for e in elements if isinstance(e, Table)] + assert len(tables) >= 1 + + def test_executive_summary_shows_all_statuses( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that executive summary shows passed, failed, and manual counts.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Passed", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Failed", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=10, + total_findings=10, + ), + RequirementData( + id="REQ-003", + description="Manual", + status=StatusChoices.MANUAL, + passed_findings=0, + failed_findings=0, + total_findings=0, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-003": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + + # Should have a summary table with all statuses + assert len(elements) > 0 + + def test_executive_summary_excludes_manual_from_percentage( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that manual requirements are excluded from compliance percentage.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Passed", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Manual", + status=StatusChoices.MANUAL, + passed_findings=0, + failed_findings=0, + total_findings=0, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + + # Should calculate 100% (only 1 evaluated requirement that passed) + assert len(elements) > 0 + + +# ============================================================================= +# Charts Section Tests +# ============================================================================= + + +class TestCSAChartsSection: + """Test suite for CSA charts section generation.""" + + def test_charts_section_has_section_chart_title( + self, csa_generator, basic_csa_compliance_data + ): + """Test that charts section has section compliance title.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_charts_section(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "Section" in content or "Compliance" in content + + def test_charts_section_has_page_break( + self, csa_generator, basic_csa_compliance_data + ): + """Test that charts section has page breaks.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_charts_section(basic_csa_compliance_data) + + page_breaks = [e for e in elements if isinstance(e, PageBreak)] + assert len(page_breaks) >= 1 + + def test_charts_section_has_section_breakdown( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that charts section includes section breakdown table.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Test requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + elements = csa_generator.create_charts_section(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "Section" in content or "Breakdown" in content + + +# ============================================================================= +# Section Chart Tests +# ============================================================================= + + +class TestCSASectionChart: + """Test suite for CSA section compliance chart.""" + + def test_section_chart_creation( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that section chart is created successfully.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Test requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + chart_buffer = csa_generator._create_section_chart(basic_csa_compliance_data) + + assert isinstance(chart_buffer, io.BytesIO) + assert chart_buffer.getvalue() # Not empty + + def test_section_chart_excludes_manual( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that manual requirements are excluded from section chart.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Auto requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Manual requirement", + status=StatusChoices.MANUAL, + passed_findings=0, + failed_findings=0, + total_findings=0, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + # Should not raise any errors + chart_buffer = csa_generator._create_section_chart(basic_csa_compliance_data) + assert isinstance(chart_buffer, io.BytesIO) + + def test_section_chart_multiple_sections( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + mock_csa_requirement_attribute_logging, + mock_csa_requirement_attribute_crypto, + ): + """Test section chart with multiple sections.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="IAM requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Logging requirement", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=10, + total_findings=10, + ), + RequirementData( + id="REQ-003", + description="Crypto requirement", + status=StatusChoices.PASS, + passed_findings=5, + failed_findings=0, + total_findings=5, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": { + "req_attributes": [mock_csa_requirement_attribute_logging] + } + }, + "REQ-003": { + "attributes": { + "req_attributes": [mock_csa_requirement_attribute_crypto] + } + }, + } + + chart_buffer = csa_generator._create_section_chart(basic_csa_compliance_data) + assert isinstance(chart_buffer, io.BytesIO) + + +# ============================================================================= +# Section Table Tests +# ============================================================================= + + +class TestCSASectionTable: + """Test suite for CSA section breakdown table.""" + + def test_section_table_creation( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that section table is created successfully.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Test requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + table = csa_generator._create_section_table(basic_csa_compliance_data) + + assert isinstance(table, Table) + + def test_section_table_counts_statuses( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that section table counts passed, failed, and manual.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Passed", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Failed", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=10, + total_findings=10, + ), + RequirementData( + id="REQ-003", + description="Manual", + status=StatusChoices.MANUAL, + passed_findings=0, + failed_findings=0, + total_findings=0, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-003": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + table = csa_generator._create_section_table(basic_csa_compliance_data) + assert isinstance(table, Table) + + +# ============================================================================= +# Requirements Index Tests +# ============================================================================= + + +class TestCSARequirementsIndex: + """Test suite for CSA requirements index generation.""" + + def test_requirements_index_has_title( + self, csa_generator, basic_csa_compliance_data + ): + """Test that requirements index has English title.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_requirements_index(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "Requirements Index" in content + + def test_requirements_index_organized_by_section( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + mock_csa_requirement_attribute_logging, + ): + """Test that requirements index is organized by section.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="IAM requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Logging requirement", + status=StatusChoices.PASS, + passed_findings=5, + failed_findings=0, + total_findings=5, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": { + "req_attributes": [mock_csa_requirement_attribute_logging] + } + }, + } + + elements = csa_generator.create_requirements_index(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + # Should have section headers + assert "Identity" in content or "Logging" in content or "Access" in content + + def test_requirements_index_shows_status_indicators( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that requirements index shows pass/fail/manual indicators.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="Passed requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Failed requirement", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=10, + total_findings=10, + ), + RequirementData( + id="REQ-003", + description="Manual requirement", + status=StatusChoices.MANUAL, + passed_findings=0, + failed_findings=0, + total_findings=0, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-003": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + elements = csa_generator.create_requirements_index(basic_csa_compliance_data) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + # Should have status indicators + assert "\u2713" in content or "\u2717" in content or "\u2299" in content + + def test_requirements_index_truncates_long_descriptions( + self, csa_generator, basic_csa_compliance_data + ): + """Test that long descriptions are truncated.""" + mock_attr = Mock() + mock_attr.Section = "Identity & Access Management" + mock_attr.CCMLite = "Yes" + mock_attr.IaaS = "Yes" + mock_attr.PaaS = "Yes" + mock_attr.SaaS = "Yes" + mock_attr.ScopeApplicability = [] + + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="A" * 100, + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": {"attributes": {"req_attributes": [mock_attr]}}, + } + + # Should not raise errors + elements = csa_generator.create_requirements_index(basic_csa_compliance_data) + assert len(elements) > 0 + + +# ============================================================================= +# Requirement Attributes Tests +# ============================================================================= + + +class TestCSARequirementAttributes: + """Test suite for CSA requirement attributes display.""" + + def test_format_attributes_applicability_line( + self, csa_generator, mock_csa_requirement_attribute_iam + ): + """Test that applicability attributes (CCMLite, IaaS, PaaS, SaaS) are rendered.""" + elements = csa_generator._format_requirement_attributes( + mock_csa_requirement_attribute_iam + ) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "CCMLite: Yes" in content + assert "IaaS: Yes" in content + assert "PaaS: Yes" in content + assert "SaaS: Yes" in content + + def test_format_attributes_partial_applicability( + self, csa_generator, mock_csa_requirement_attribute_logging + ): + """Test attributes when some applicability fields are 'No'.""" + elements = csa_generator._format_requirement_attributes( + mock_csa_requirement_attribute_logging + ) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "IaaS: Yes" in content + assert "PaaS: No" in content + + def test_format_attributes_scope_applicability_refs( + self, csa_generator, mock_csa_requirement_attribute_iam + ): + """Test that ScopeApplicability references are displayed.""" + elements = csa_generator._format_requirement_attributes( + mock_csa_requirement_attribute_iam + ) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "ISO 27001" in content + assert "NIST 800-53" in content + + def test_format_attributes_empty_scope( + self, csa_generator, mock_csa_requirement_attribute_crypto + ): + """Test that empty ScopeApplicability produces no reference line.""" + elements = csa_generator._format_requirement_attributes( + mock_csa_requirement_attribute_crypto + ) + + # Should have applicability line but no scope reference line + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + assert len(paragraphs) == 1 # Only the applicability line + + def test_format_attributes_no_applicability(self, csa_generator): + """Test attributes when all applicability fields are empty.""" + mock = Mock() + mock.CCMLite = "" + mock.IaaS = "" + mock.PaaS = "" + mock.SaaS = "" + mock.ScopeApplicability = [] + + elements = csa_generator._format_requirement_attributes(mock) + + assert len(elements) == 0 + + def test_format_attributes_truncates_long_identifiers(self, csa_generator): + """Test that ScopeApplicability with many identifiers is truncated.""" + mock = Mock() + mock.CCMLite = "Yes" + mock.IaaS = "Yes" + mock.PaaS = "Yes" + mock.SaaS = "Yes" + mock.ScopeApplicability = [ + { + "ReferenceId": "NIST 800-53", + "Identifiers": ["AC-1", "AC-2", "AC-3", "AC-4", "AC-5", "AC-6"], + }, + ] + + elements = csa_generator._format_requirement_attributes(mock) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + # Should show first 4 and ellipsis + assert "AC-1" in content + assert "AC-4" in content + assert "..." in content + + def test_attr_style_returns_paragraph_style(self, csa_generator): + """Test that _attr_style returns a valid ParagraphStyle.""" + from reportlab.lib.styles import ParagraphStyle + + style = csa_generator._attr_style() + assert isinstance(style, ParagraphStyle) + assert style.fontSize == 10 + assert style.leftIndent == 30 + + def test_render_requirement_detail_extras( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test that detail extras hook renders CSA attributes.""" + req = RequirementData( + id="REQ-001", + description="IAM requirement", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ) + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + } + + elements = csa_generator._render_requirement_detail_extras( + req, basic_csa_compliance_data + ) + + paragraphs = [e for e in elements if isinstance(e, Paragraph)] + content = " ".join(str(p.text) for p in paragraphs) + assert "CCMLite" in content + assert "ISO 27001" in content + + def test_render_requirement_detail_extras_no_metadata( + self, + csa_generator, + basic_csa_compliance_data, + ): + """Test that detail extras returns empty when no metadata found.""" + req = RequirementData( + id="REQ-UNKNOWN", + description="No metadata", + status=StatusChoices.PASS, + passed_findings=0, + failed_findings=0, + total_findings=0, + ) + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator._render_requirement_detail_extras( + req, basic_csa_compliance_data + ) + + assert elements == [] + + +# ============================================================================= +# Empty Data Tests +# ============================================================================= + + +class TestCSAEmptyData: + """Test suite for CSA with empty or minimal data.""" + + def test_executive_summary_empty_requirements( + self, csa_generator, basic_csa_compliance_data + ): + """Test executive summary with no requirements.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + + assert len(elements) > 0 + + def test_charts_section_empty_requirements( + self, csa_generator, basic_csa_compliance_data + ): + """Test charts section with no requirements.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_charts_section(basic_csa_compliance_data) + + assert len(elements) > 0 + + def test_requirements_index_empty(self, csa_generator, basic_csa_compliance_data): + """Test requirements index with no requirements.""" + basic_csa_compliance_data.requirements = [] + basic_csa_compliance_data.attributes_by_requirement_id = {} + + elements = csa_generator.create_requirements_index(basic_csa_compliance_data) + + # Should at least have the title + assert len(elements) >= 1 + + +# ============================================================================= +# All Pass / All Fail Tests +# ============================================================================= + + +class TestCSAEdgeCases: + """Test suite for CSA edge cases.""" + + def test_all_requirements_pass( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test with all requirements passing.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id=f"REQ-{i:03d}", + description=f"Passing requirement {i}", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ) + for i in range(1, 6) + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + f"REQ-{i:03d}": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + } + for i in range(1, 6) + } + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + assert len(elements) > 0 + + def test_all_requirements_fail( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test with all requirements failing.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id=f"REQ-{i:03d}", + description=f"Failing requirement {i}", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=10, + total_findings=10, + ) + for i in range(1, 6) + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + f"REQ-{i:03d}": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + } + for i in range(1, 6) + } + + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + assert len(elements) > 0 + + def test_all_requirements_manual( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + ): + """Test with all requirements being manual.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id=f"REQ-{i:03d}", + description=f"Manual requirement {i}", + status=StatusChoices.MANUAL, + passed_findings=0, + failed_findings=0, + total_findings=0, + ) + for i in range(1, 6) + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + f"REQ-{i:03d}": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + } + for i in range(1, 6) + } + + # Should handle gracefully - compliance should be 100% when no evaluated + elements = csa_generator.create_executive_summary(basic_csa_compliance_data) + assert len(elements) > 0 + + +# ============================================================================= +# Integration Tests +# ============================================================================= + + +class TestCSAIntegration: + """Integration tests for CSA report generation.""" + + def test_full_report_generation_flow( + self, + csa_generator, + basic_csa_compliance_data, + mock_csa_requirement_attribute_iam, + mock_csa_requirement_attribute_logging, + ): + """Test the complete report generation flow.""" + basic_csa_compliance_data.requirements = [ + RequirementData( + id="REQ-001", + description="IAM passed", + status=StatusChoices.PASS, + passed_findings=10, + failed_findings=0, + total_findings=10, + ), + RequirementData( + id="REQ-002", + description="Logging failed", + status=StatusChoices.FAIL, + passed_findings=0, + failed_findings=5, + total_findings=5, + ), + ] + basic_csa_compliance_data.attributes_by_requirement_id = { + "REQ-001": { + "attributes": {"req_attributes": [mock_csa_requirement_attribute_iam]} + }, + "REQ-002": { + "attributes": { + "req_attributes": [mock_csa_requirement_attribute_logging] + } + }, + } + + # Generate all sections + exec_summary = csa_generator.create_executive_summary(basic_csa_compliance_data) + charts = csa_generator.create_charts_section(basic_csa_compliance_data) + index = csa_generator.create_requirements_index(basic_csa_compliance_data) + + # All sections should generate without errors + assert len(exec_summary) > 0 + assert len(charts) > 0 + assert len(index) > 0 diff --git a/ui/CHANGELOG.md b/ui/CHANGELOG.md index 1e7e252047..8d5f47ac6a 100644 --- a/ui/CHANGELOG.md +++ b/ui/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to the **Prowler UI** are documented in this file. ## [1.19.0] (Prowler UNRELEASED) +### 🚀 Added + +- PDF report available for the CSA CCM compliance framework [(#10088)](https://github.com/prowler-cloud/prowler/pull/10088) + ### 🔄 Changed - Attack Paths: Query list now shows their name and short description, when one is selected it also shows a longer description and an attribution if it has it [(#9983)](https://github.com/prowler-cloud/prowler/pull/9983) diff --git a/ui/lib/compliance/compliance-report-types.ts b/ui/lib/compliance/compliance-report-types.ts index c95cbf6362..c7d65ac726 100644 --- a/ui/lib/compliance/compliance-report-types.ts +++ b/ui/lib/compliance/compliance-report-types.ts @@ -13,6 +13,7 @@ export const COMPLIANCE_REPORT_TYPES = { THREATSCORE: "threatscore", ENS: "ens", NIS2: "nis2", + CSA_CCM: "csa", // Future report types can be added here: // CIS: "cis", // NIST: "nist", @@ -34,6 +35,7 @@ export const COMPLIANCE_REPORT_DISPLAY_NAMES: Record< [COMPLIANCE_REPORT_TYPES.THREATSCORE]: "ThreatScore", [COMPLIANCE_REPORT_TYPES.ENS]: "ENS RD2022", [COMPLIANCE_REPORT_TYPES.NIS2]: "NIS2", + [COMPLIANCE_REPORT_TYPES.CSA_CCM]: "CSA CCM", // Add display names for future report types here }; @@ -47,6 +49,7 @@ export const COMPLIANCE_REPORT_BUTTON_LABELS: Record< [COMPLIANCE_REPORT_TYPES.THREATSCORE]: "PDF ThreatScore Report", [COMPLIANCE_REPORT_TYPES.ENS]: "PDF ENS Report", [COMPLIANCE_REPORT_TYPES.NIS2]: "PDF NIS2 Report", + [COMPLIANCE_REPORT_TYPES.CSA_CCM]: "PDF CSA CCM Report", // Add button labels for future report types here }; @@ -58,6 +61,7 @@ const FRAMEWORK_TO_REPORT_TYPE: Record = { ProwlerThreatScore: COMPLIANCE_REPORT_TYPES.THREATSCORE, ENS: COMPLIANCE_REPORT_TYPES.ENS, NIS2: COMPLIANCE_REPORT_TYPES.NIS2, + "CSA-CCM": COMPLIANCE_REPORT_TYPES.CSA_CCM, // Add new framework mappings here as PDF support is added: // "CIS-1.5": COMPLIANCE_REPORT_TYPES.CIS, // "NIST-800-53": COMPLIANCE_REPORT_TYPES.NIST,