mirror of
https://github.com/prowler-cloud/prowler.git
synced 2025-12-19 05:17:47 +00:00
feat(nis2): support PDF reporting (#9170)
Co-authored-by: alejandrobailo <alejandrobailo94@gmail.com> Co-authored-by: Josema Camacho <josema@prowler.com>
This commit is contained in:
@@ -15,6 +15,7 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
- Support C5 compliance framework for the GCP provider [(#9097)](https://github.com/prowler-cloud/prowler/pull/9097)
|
||||
- Support for Amazon Bedrock and OpenAI compatible providers in Lighthouse AI [(#8957)](https://github.com/prowler-cloud/prowler/pull/8957)
|
||||
- Support PDF reporting for ENS compliance framework [(#9158)](https://github.com/prowler-cloud/prowler/pull/9158)
|
||||
- Support PDF reporting for NIS2 compliance framework [(#9170)](https://github.com/prowler-cloud/prowler/pull/9170)
|
||||
- Tenant-wide ThreatScore overview aggregation and snapshot persistence with backfill support [(#9148)](https://github.com/prowler-cloud/prowler/pull/9148)
|
||||
- Added `metadata`, `details`, and `partition` attributes to `/resources` endpoint & `details`, and `partition` to `/findings` endpoint [(#9098)](https://github.com/prowler-cloud/prowler/pull/9098)
|
||||
- Support for MongoDB Atlas provider [(#9167)](https://github.com/prowler-cloud/prowler/pull/9167)
|
||||
|
||||
2
api/poetry.lock
generated
2
api/poetry.lock
generated
@@ -4651,7 +4651,7 @@ tzlocal = "5.3.1"
|
||||
type = "git"
|
||||
url = "https://github.com/prowler-cloud/prowler.git"
|
||||
reference = "master"
|
||||
resolved_reference = "a52697bfdfee83d14a49c11dcbe96888b5cd767e"
|
||||
resolved_reference = "ced122ac0d9c078933b996d04a9f9aff9bb6cc75"
|
||||
|
||||
[[package]]
|
||||
name = "psutil"
|
||||
|
||||
@@ -8985,6 +8985,72 @@ paths:
|
||||
'404':
|
||||
description: The scan has no ENS reports, or the ENS report generation task
|
||||
has not started yet
|
||||
/api/v1/scans/{id}/nis2:
|
||||
get:
|
||||
operationId: scans_nis2_retrieve
|
||||
description: Download NIS2 compliance report (Directive (EU) 2022/2555) as a
|
||||
PDF file.
|
||||
summary: Retrieve NIS2 compliance report
|
||||
parameters:
|
||||
- in: query
|
||||
name: fields[scans]
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
enum:
|
||||
- name
|
||||
- trigger
|
||||
- state
|
||||
- unique_resource_count
|
||||
- progress
|
||||
- duration
|
||||
- provider
|
||||
- task
|
||||
- inserted_at
|
||||
- started_at
|
||||
- completed_at
|
||||
- scheduled_at
|
||||
- next_scan_at
|
||||
- processor
|
||||
- url
|
||||
description: endpoint return only specific fields in the response on a per-type
|
||||
basis by including a fields[TYPE] query parameter.
|
||||
explode: false
|
||||
- in: path
|
||||
name: id
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this scan.
|
||||
required: true
|
||||
- in: query
|
||||
name: include
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
enum:
|
||||
- provider
|
||||
description: include query parameter to allow the client to customize which
|
||||
related resources should be returned.
|
||||
explode: false
|
||||
tags:
|
||||
- Scan
|
||||
security:
|
||||
- JWT or API Key: []
|
||||
responses:
|
||||
'200':
|
||||
description: PDF file containing the NIS2 compliance report
|
||||
'202':
|
||||
description: The task is in progress
|
||||
'401':
|
||||
description: API key missing or user not Authenticated
|
||||
'403':
|
||||
description: There is a problem with credentials
|
||||
'404':
|
||||
description: The scan has no NIS2 reports, or the NIS2 report generation
|
||||
task has not started yet
|
||||
/api/v1/schedules/daily:
|
||||
post:
|
||||
operationId: schedules_daily_create
|
||||
|
||||
@@ -1680,6 +1680,25 @@ class ProviderViewSet(DisablePaginationMixin, BaseRLSViewSet):
|
||||
),
|
||||
},
|
||||
),
|
||||
nis2=extend_schema(
|
||||
tags=["Scan"],
|
||||
summary="Retrieve NIS2 compliance report",
|
||||
description="Download NIS2 compliance report (Directive (EU) 2022/2555) as a PDF file.",
|
||||
request=None,
|
||||
responses={
|
||||
200: OpenApiResponse(
|
||||
description="PDF file containing the NIS2 compliance report"
|
||||
),
|
||||
202: OpenApiResponse(description="The task is in progress"),
|
||||
401: OpenApiResponse(
|
||||
description="API key missing or user not Authenticated"
|
||||
),
|
||||
403: OpenApiResponse(description="There is a problem with credentials"),
|
||||
404: OpenApiResponse(
|
||||
description="The scan has no NIS2 reports, or the NIS2 report generation task has not started yet"
|
||||
),
|
||||
},
|
||||
),
|
||||
)
|
||||
@method_decorator(CACHE_DECORATOR, name="list")
|
||||
@method_decorator(CACHE_DECORATOR, name="retrieve")
|
||||
@@ -1742,6 +1761,9 @@ class ScanViewSet(BaseRLSViewSet):
|
||||
elif self.action == "ens":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
elif self.action == "nis2":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
return super().get_serializer_class()
|
||||
|
||||
def partial_update(self, request, *args, **kwargs):
|
||||
@@ -2064,6 +2086,45 @@ class ScanViewSet(BaseRLSViewSet):
|
||||
content, filename = loader
|
||||
return self._serve_file(content, filename, "application/pdf")
|
||||
|
||||
@action(
|
||||
detail=True,
|
||||
methods=["get"],
|
||||
url_name="nis2",
|
||||
)
|
||||
def nis2(self, request, pk=None):
|
||||
scan = self.get_object()
|
||||
running_resp = self._get_task_status(scan)
|
||||
if running_resp:
|
||||
return running_resp
|
||||
|
||||
if not scan.output_location:
|
||||
return Response(
|
||||
{
|
||||
"detail": "The scan has no reports, or the NIS2 report generation task has not started yet."
|
||||
},
|
||||
status=status.HTTP_404_NOT_FOUND,
|
||||
)
|
||||
|
||||
if scan.output_location.startswith("s3://"):
|
||||
bucket = env.str("DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET", "")
|
||||
key_prefix = scan.output_location.removeprefix(f"s3://{bucket}/")
|
||||
prefix = os.path.join(
|
||||
os.path.dirname(key_prefix),
|
||||
"nis2",
|
||||
"*_nis2_report.pdf",
|
||||
)
|
||||
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
|
||||
else:
|
||||
base = os.path.dirname(scan.output_location)
|
||||
pattern = os.path.join(base, "nis2", "*_nis2_report.pdf")
|
||||
loader = self._load_file(pattern, s3=False)
|
||||
|
||||
if isinstance(loader, Response):
|
||||
return loader
|
||||
|
||||
content, filename = loader
|
||||
return self._serve_file(content, filename, "application/pdf")
|
||||
|
||||
def create(self, request, *args, **kwargs):
|
||||
input_serializer = self.get_serializer(data=request.data)
|
||||
input_serializer.is_valid(raise_exception=True)
|
||||
|
||||
BIN
api/src/backend/tasks/assets/img/nis2_logo.png
Normal file
BIN
api/src/backend/tasks/assets/img/nis2_logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 94 KiB |
@@ -328,6 +328,8 @@ def _generate_compliance_output_directory(
|
||||
'/tmp/tenant-1234/scan-5678/threatscore/prowler-output-aws-20230215123456'
|
||||
>>> _generate_compliance_output_directory("/tmp", "aws", "tenant-1234", "scan-5678", "ens")
|
||||
'/tmp/tenant-1234/scan-5678/ens/prowler-output-aws-20230215123456'
|
||||
>>> _generate_compliance_output_directory("/tmp", "aws", "tenant-1234", "scan-5678", "nis2")
|
||||
'/tmp/tenant-1234/scan-5678/nis2/prowler-output-aws-20230215123456'
|
||||
"""
|
||||
return _build_output_path(
|
||||
output_directory,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -692,13 +692,12 @@ def jira_integration_task(
|
||||
)
|
||||
def generate_compliance_reports_task(tenant_id: str, scan_id: str, provider_id: str):
|
||||
"""
|
||||
Optimized task to generate both ThreatScore and ENS reports with shared queries.
|
||||
Optimized task to generate ThreatScore, ENS, and NIS2 reports with shared queries.
|
||||
|
||||
This task is more efficient than running generate_threatscore_report_task and
|
||||
generate_ens_report_task separately because it reuses database queries:
|
||||
- Provider object fetched once (instead of twice)
|
||||
- Requirement statistics aggregated once (instead of twice)
|
||||
- Can reduce database load by up to 50%
|
||||
This task is more efficient than running separate report tasks because it reuses database queries:
|
||||
- Provider object fetched once (instead of three times)
|
||||
- Requirement statistics aggregated once (instead of three times)
|
||||
- Can reduce database load by up to 50-70%
|
||||
|
||||
Args:
|
||||
tenant_id (str): The tenant identifier.
|
||||
@@ -706,7 +705,7 @@ def generate_compliance_reports_task(tenant_id: str, scan_id: str, provider_id:
|
||||
provider_id (str): The provider identifier.
|
||||
|
||||
Returns:
|
||||
dict: Results for both reports containing upload status and paths.
|
||||
dict: Results for all reports containing upload status and paths.
|
||||
"""
|
||||
return generate_compliance_reports_job(
|
||||
tenant_id=tenant_id,
|
||||
@@ -714,6 +713,7 @@ def generate_compliance_reports_task(tenant_id: str, scan_id: str, provider_id:
|
||||
provider_id=provider_id,
|
||||
generate_threatscore=True,
|
||||
generate_ens=True,
|
||||
generate_nis2=True,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ from tasks.jobs.report import (
|
||||
COLOR_HIGH_RISK,
|
||||
COLOR_LOW_RISK,
|
||||
COLOR_MEDIUM_RISK,
|
||||
COLOR_NIS2_PRIMARY,
|
||||
COLOR_SAFE,
|
||||
_create_dimensions_radar_chart,
|
||||
_create_ens_dimension_badges,
|
||||
@@ -29,6 +30,9 @@ from tasks.jobs.report import (
|
||||
_create_header_table_style,
|
||||
_create_info_table_style,
|
||||
_create_marco_category_chart,
|
||||
_create_nis2_requirements_index,
|
||||
_create_nis2_section_chart,
|
||||
_create_nis2_subsection_table,
|
||||
_create_pdf_styles,
|
||||
_create_risk_component,
|
||||
_create_section_score_chart,
|
||||
@@ -41,6 +45,7 @@ from tasks.jobs.report import (
|
||||
_load_findings_for_requirement_checks,
|
||||
_safe_getattr,
|
||||
generate_compliance_reports_job,
|
||||
generate_nis2_report,
|
||||
generate_threatscore_report,
|
||||
)
|
||||
from tasks.jobs.threatscore_utils import (
|
||||
@@ -1266,13 +1271,14 @@ class TestGenerateComplianceReportsOptimized:
|
||||
|
||||
@patch("tasks.jobs.report.rmtree")
|
||||
@patch("tasks.jobs.report._upload_to_s3")
|
||||
@patch("tasks.jobs.report.generate_nis2_report")
|
||||
@patch("tasks.jobs.report.generate_ens_report")
|
||||
@patch("tasks.jobs.report.generate_threatscore_report")
|
||||
@patch("tasks.jobs.report._generate_compliance_output_directory")
|
||||
@patch("tasks.jobs.report._aggregate_requirement_statistics_from_database")
|
||||
@patch("tasks.jobs.report.Provider")
|
||||
@patch("tasks.jobs.report.ScanSummary")
|
||||
def test_generates_both_reports_with_shared_queries(
|
||||
def test_generates_reports_with_shared_queries(
|
||||
self,
|
||||
mock_scan_summary,
|
||||
mock_provider,
|
||||
@@ -1280,10 +1286,11 @@ class TestGenerateComplianceReportsOptimized:
|
||||
mock_gen_dir,
|
||||
mock_gen_threatscore,
|
||||
mock_gen_ens,
|
||||
mock_gen_nis2,
|
||||
mock_upload,
|
||||
mock_rmtree,
|
||||
):
|
||||
"""Test that both reports are generated with shared database queries."""
|
||||
"""Test that requested reports are generated with shared database queries."""
|
||||
# Setup mocks
|
||||
mock_scan_summary.objects.filter.return_value.exists.return_value = True
|
||||
mock_provider_obj = Mock()
|
||||
@@ -1294,12 +1301,14 @@ class TestGenerateComplianceReportsOptimized:
|
||||
mock_aggregate_stats.return_value = {"check-1": {"passed": 10, "total": 15}}
|
||||
# Mock returns different paths for different compliance_framework calls
|
||||
mock_gen_dir.side_effect = [
|
||||
"/tmp/threatscore_path", # First call with compliance_framework="threatscore"
|
||||
"/tmp/ens_path", # Second call with compliance_framework="ens"
|
||||
"/tmp/reports/threatscore/output", # First call with compliance_framework="threatscore"
|
||||
"/tmp/reports/ens/output", # Second call with compliance_framework="ens"
|
||||
"/tmp/reports/nis2/output", # Third call with compliance_framework="nis2"
|
||||
]
|
||||
mock_upload.side_effect = [
|
||||
"s3://bucket/threatscore.pdf",
|
||||
"s3://bucket/ens.pdf",
|
||||
"s3://bucket/nis2.pdf",
|
||||
]
|
||||
|
||||
result = generate_compliance_reports_job(
|
||||
@@ -1319,6 +1328,7 @@ class TestGenerateComplianceReportsOptimized:
|
||||
# Verify both report generation functions were called with shared data
|
||||
assert mock_gen_threatscore.call_count == 1
|
||||
assert mock_gen_ens.call_count == 1
|
||||
assert mock_gen_nis2.call_count == 1
|
||||
|
||||
# Verify provider_obj and requirement_statistics were passed to both
|
||||
threatscore_call_kwargs = mock_gen_threatscore.call_args[1]
|
||||
@@ -1333,11 +1343,24 @@ class TestGenerateComplianceReportsOptimized:
|
||||
"check-1": {"passed": 10, "total": 15}
|
||||
}
|
||||
|
||||
nis2_call_kwargs = mock_gen_nis2.call_args[1]
|
||||
assert nis2_call_kwargs["provider_obj"] == mock_provider_obj
|
||||
assert nis2_call_kwargs["requirement_statistics"] == {
|
||||
"check-1": {"passed": 10, "total": 15}
|
||||
}
|
||||
|
||||
# Verify both reports were uploaded successfully
|
||||
assert result["threatscore"]["upload"] is True
|
||||
assert result["threatscore"]["path"] == "s3://bucket/threatscore.pdf"
|
||||
assert result["ens"]["upload"] is True
|
||||
assert result["ens"]["path"] == "s3://bucket/ens.pdf"
|
||||
assert result["nis2"]["upload"] is True
|
||||
assert result["nis2"]["path"] == "s3://bucket/nis2.pdf"
|
||||
|
||||
# Cleanup should remove the temporary parent directory when everything uploads
|
||||
mock_rmtree.assert_called_once()
|
||||
cleanup_path_arg = mock_rmtree.call_args[0][0]
|
||||
assert str(cleanup_path_arg) == "/tmp/reports"
|
||||
|
||||
@patch("tasks.jobs.report._aggregate_requirement_statistics_from_database")
|
||||
@patch("tasks.jobs.report.Provider")
|
||||
@@ -1423,3 +1446,362 @@ class TestGenerateComplianceReportsOptimized:
|
||||
# Verify DB was only queried for check-2 (not check-1)
|
||||
filter_call = mock_finding_class.all_objects.filter.call_args
|
||||
assert filter_call[1]["check_id__in"] == ["check-2"]
|
||||
|
||||
|
||||
class TestNIS2SectionChart:
|
||||
"""Test suite for _create_nis2_section_chart function."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_matplotlib(self):
|
||||
"""Setup matplotlib backend for tests."""
|
||||
matplotlib.use("Agg")
|
||||
|
||||
def test_creates_chart_with_sections(self):
|
||||
"""Verify chart is created with correct sections and compliance data."""
|
||||
# Mock requirement with NIS2 section attribute
|
||||
mock_attr = Mock()
|
||||
mock_attr.Section = (
|
||||
"1 POLICY ON THE SECURITY OF NETWORK AND INFORMATION SYSTEMS"
|
||||
)
|
||||
|
||||
requirements_list = [
|
||||
{
|
||||
"id": "1.1.1.a",
|
||||
"description": "Test requirement",
|
||||
"attributes": {
|
||||
"passed_findings": 5,
|
||||
"total_findings": 10,
|
||||
"status": StatusChoices.FAIL,
|
||||
},
|
||||
}
|
||||
]
|
||||
|
||||
attributes_by_requirement_id = {
|
||||
"1.1.1.a": {
|
||||
"attributes": {
|
||||
"req_attributes": [mock_attr],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Call function
|
||||
result = _create_nis2_section_chart(
|
||||
requirements_list, attributes_by_requirement_id
|
||||
)
|
||||
|
||||
# Verify result is a BytesIO buffer
|
||||
assert isinstance(result, io.BytesIO)
|
||||
assert result.tell() > 0 # Buffer has content
|
||||
|
||||
def test_handles_empty_requirements(self):
|
||||
"""Verify chart handles empty requirements gracefully."""
|
||||
result = _create_nis2_section_chart([], {})
|
||||
|
||||
# Verify result is still a valid BytesIO buffer
|
||||
assert isinstance(result, io.BytesIO)
|
||||
|
||||
def test_calculates_compliance_percentage_correctly(self):
|
||||
"""Verify compliance percentage calculation is correct."""
|
||||
mock_attr1 = Mock()
|
||||
mock_attr1.Section = "11 ACCESS CONTROL"
|
||||
|
||||
mock_attr2 = Mock()
|
||||
mock_attr2.Section = "11 ACCESS CONTROL"
|
||||
|
||||
requirements_list = [
|
||||
{
|
||||
"id": "11.1.1",
|
||||
"description": "Test 1",
|
||||
"attributes": {
|
||||
"passed_findings": 8,
|
||||
"total_findings": 10, # 80%
|
||||
"status": StatusChoices.PASS,
|
||||
},
|
||||
},
|
||||
{
|
||||
"id": "11.1.2",
|
||||
"description": "Test 2",
|
||||
"attributes": {
|
||||
"passed_findings": 10,
|
||||
"total_findings": 10, # 100%
|
||||
"status": StatusChoices.PASS,
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
attributes_by_requirement_id = {
|
||||
"11.1.1": {"attributes": {"req_attributes": [mock_attr1]}},
|
||||
"11.1.2": {"attributes": {"req_attributes": [mock_attr2]}},
|
||||
}
|
||||
|
||||
# Call function
|
||||
result = _create_nis2_section_chart(
|
||||
requirements_list, attributes_by_requirement_id
|
||||
)
|
||||
|
||||
# Expected: (8+10)/(10+10) = 18/20 = 90%
|
||||
assert isinstance(result, io.BytesIO)
|
||||
|
||||
|
||||
class TestNIS2SubsectionTable:
|
||||
"""Test suite for _create_nis2_subsection_table function."""
|
||||
|
||||
def test_creates_table_with_subsections(self):
|
||||
"""Verify table is created with correct subsection breakdown."""
|
||||
mock_attr1 = Mock()
|
||||
mock_attr1.SubSection = (
|
||||
"1.1 Policy on the security of network and information systems"
|
||||
)
|
||||
|
||||
mock_attr2 = Mock()
|
||||
mock_attr2.SubSection = "1.2 Roles, responsibilities and authorities"
|
||||
|
||||
requirements_list = [
|
||||
{
|
||||
"id": "1.1.1.a",
|
||||
"description": "Test 1",
|
||||
"attributes": {"status": StatusChoices.PASS},
|
||||
},
|
||||
{
|
||||
"id": "1.1.1.b",
|
||||
"description": "Test 2",
|
||||
"attributes": {"status": StatusChoices.FAIL},
|
||||
},
|
||||
{
|
||||
"id": "1.2.1",
|
||||
"description": "Test 3",
|
||||
"attributes": {"status": StatusChoices.MANUAL},
|
||||
},
|
||||
]
|
||||
|
||||
attributes_by_requirement_id = {
|
||||
"1.1.1.a": {"attributes": {"req_attributes": [mock_attr1]}},
|
||||
"1.1.1.b": {"attributes": {"req_attributes": [mock_attr1]}},
|
||||
"1.2.1": {"attributes": {"req_attributes": [mock_attr2]}},
|
||||
}
|
||||
|
||||
# Call function
|
||||
result = _create_nis2_subsection_table(
|
||||
requirements_list, attributes_by_requirement_id
|
||||
)
|
||||
|
||||
# Verify result is a Table
|
||||
assert isinstance(result, Table)
|
||||
|
||||
# Verify table has correct structure (header + data rows)
|
||||
assert len(result._cellvalues) > 1 # At least header + 1 row
|
||||
|
||||
# Verify header row
|
||||
assert result._cellvalues[0][0] == "SubSection"
|
||||
assert result._cellvalues[0][1] == "Total"
|
||||
assert result._cellvalues[0][2] == "Pass"
|
||||
assert result._cellvalues[0][3] == "Fail"
|
||||
assert result._cellvalues[0][4] == "Manual"
|
||||
assert result._cellvalues[0][5] == "Compliance %"
|
||||
|
||||
def test_table_has_correct_styling(self):
|
||||
"""Verify table has NIS2 styling applied."""
|
||||
mock_attr = Mock()
|
||||
mock_attr.SubSection = "Test SubSection"
|
||||
|
||||
requirements_list = [
|
||||
{
|
||||
"id": "1.1.1.a",
|
||||
"description": "Test",
|
||||
"attributes": {"status": StatusChoices.PASS},
|
||||
}
|
||||
]
|
||||
|
||||
attributes_by_requirement_id = {
|
||||
"1.1.1.a": {"attributes": {"req_attributes": [mock_attr]}}
|
||||
}
|
||||
|
||||
result = _create_nis2_subsection_table(
|
||||
requirements_list, attributes_by_requirement_id
|
||||
)
|
||||
|
||||
# Verify styling is applied
|
||||
assert isinstance(result._cellStyles, list)
|
||||
assert len(result._cellStyles) > 0
|
||||
|
||||
|
||||
class TestNIS2RequirementsIndex:
|
||||
"""Test suite for _create_nis2_requirements_index function."""
|
||||
|
||||
def test_creates_hierarchical_index(self):
|
||||
"""Verify index creates hierarchical structure by Section and SubSection."""
|
||||
pdf_styles = _create_pdf_styles()
|
||||
|
||||
mock_attr1 = Mock()
|
||||
mock_attr1.Section = "1 POLICY ON SECURITY"
|
||||
mock_attr1.SubSection = "1.1 Policy definition"
|
||||
|
||||
mock_attr2 = Mock()
|
||||
mock_attr2.Section = "1 POLICY ON SECURITY"
|
||||
mock_attr2.SubSection = "1.2 Roles and responsibilities"
|
||||
|
||||
requirements_list = [
|
||||
{
|
||||
"id": "1.1.1.a",
|
||||
"description": "Define security policies",
|
||||
"attributes": {"status": StatusChoices.PASS},
|
||||
},
|
||||
{
|
||||
"id": "1.2.1",
|
||||
"description": "Assign security roles",
|
||||
"attributes": {"status": StatusChoices.FAIL},
|
||||
},
|
||||
]
|
||||
|
||||
attributes_by_requirement_id = {
|
||||
"1.1.1.a": {"attributes": {"req_attributes": [mock_attr1]}},
|
||||
"1.2.1": {"attributes": {"req_attributes": [mock_attr2]}},
|
||||
}
|
||||
|
||||
# Call function
|
||||
result = _create_nis2_requirements_index(
|
||||
requirements_list,
|
||||
attributes_by_requirement_id,
|
||||
pdf_styles["h2"],
|
||||
pdf_styles["h3"],
|
||||
pdf_styles["normal"],
|
||||
)
|
||||
|
||||
# Verify result is a list of elements
|
||||
assert isinstance(result, list)
|
||||
assert len(result) > 0
|
||||
|
||||
def test_includes_status_indicators(self):
|
||||
"""Verify index includes status indicators (✓, ✗, ⊙)."""
|
||||
pdf_styles = _create_pdf_styles()
|
||||
|
||||
mock_attr = Mock()
|
||||
mock_attr.Section = "Test Section"
|
||||
mock_attr.SubSection = "Test SubSection"
|
||||
|
||||
requirements_list = [
|
||||
{
|
||||
"id": "test.1",
|
||||
"description": "Passed requirement",
|
||||
"attributes": {"status": StatusChoices.PASS},
|
||||
},
|
||||
{
|
||||
"id": "test.2",
|
||||
"description": "Failed requirement",
|
||||
"attributes": {"status": StatusChoices.FAIL},
|
||||
},
|
||||
{
|
||||
"id": "test.3",
|
||||
"description": "Manual requirement",
|
||||
"attributes": {"status": StatusChoices.MANUAL},
|
||||
},
|
||||
]
|
||||
|
||||
attributes_by_requirement_id = {
|
||||
"test.1": {"attributes": {"req_attributes": [mock_attr]}},
|
||||
"test.2": {"attributes": {"req_attributes": [mock_attr]}},
|
||||
"test.3": {"attributes": {"req_attributes": [mock_attr]}},
|
||||
}
|
||||
|
||||
result = _create_nis2_requirements_index(
|
||||
requirements_list,
|
||||
attributes_by_requirement_id,
|
||||
pdf_styles["h2"],
|
||||
pdf_styles["h3"],
|
||||
pdf_styles["normal"],
|
||||
)
|
||||
|
||||
# Convert paragraphs to text and check for status indicators
|
||||
str(result)
|
||||
# Status indicators should be present in the generated content
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestGenerateNIS2Report:
|
||||
"""Test suite for generate_nis2_report function."""
|
||||
|
||||
@patch("tasks.jobs.report.initialize_prowler_provider")
|
||||
@patch("tasks.jobs.report.Provider.objects.get")
|
||||
@patch("tasks.jobs.report.ScanSummary.objects.filter")
|
||||
@patch("tasks.jobs.report.Compliance.get_bulk")
|
||||
@patch("tasks.jobs.report.SimpleDocTemplate")
|
||||
def test_generates_nis2_report_successfully(
|
||||
self,
|
||||
mock_doc,
|
||||
mock_compliance,
|
||||
mock_scan_summary,
|
||||
mock_provider_get,
|
||||
mock_init_provider,
|
||||
tenants_fixture,
|
||||
scans_fixture,
|
||||
):
|
||||
"""Verify NIS2 report generation completes successfully."""
|
||||
tenant = tenants_fixture[0]
|
||||
scan = scans_fixture[0]
|
||||
|
||||
# Setup mocks
|
||||
mock_provider = Mock()
|
||||
mock_provider.provider = "aws"
|
||||
mock_provider.uid = "provider-123"
|
||||
mock_provider_get.return_value = mock_provider
|
||||
|
||||
mock_scan_summary.return_value.exists.return_value = True
|
||||
|
||||
# Mock compliance object
|
||||
mock_compliance_obj = Mock()
|
||||
mock_compliance_obj.Framework = "NIS2"
|
||||
mock_compliance_obj.Name = "Network and Information Security Directive"
|
||||
mock_compliance_obj.Version = ""
|
||||
mock_compliance_obj.Description = "NIS2 Directive"
|
||||
mock_compliance_obj.Requirements = []
|
||||
|
||||
mock_compliance.return_value = {"nis2_aws": mock_compliance_obj}
|
||||
|
||||
mock_init_provider.return_value = MagicMock()
|
||||
mock_doc_instance = Mock()
|
||||
mock_doc.return_value = mock_doc_instance
|
||||
|
||||
expected_output_path = "/tmp/test_nis2.pdf"
|
||||
|
||||
# Call function
|
||||
with patch("tasks.jobs.report.rls_transaction"):
|
||||
with patch(
|
||||
"tasks.jobs.report._aggregate_requirement_statistics_from_database"
|
||||
) as mock_aggregate:
|
||||
mock_aggregate.return_value = {}
|
||||
|
||||
with patch(
|
||||
"tasks.jobs.report._calculate_requirements_data_from_statistics"
|
||||
) as mock_calculate:
|
||||
mock_calculate.return_value = ({}, [])
|
||||
|
||||
# Should not raise exception
|
||||
generate_nis2_report(
|
||||
tenant_id=str(tenant.id),
|
||||
scan_id=str(scan.id),
|
||||
compliance_id="nis2_aws",
|
||||
output_path=expected_output_path,
|
||||
provider_id="provider-123",
|
||||
only_failed=True,
|
||||
)
|
||||
|
||||
# Verify SimpleDocTemplate was initialized with correct output path
|
||||
mock_doc.assert_called_once()
|
||||
call_args = mock_doc.call_args
|
||||
assert call_args[0][0] == expected_output_path, (
|
||||
f"Expected SimpleDocTemplate to be called with {expected_output_path}, "
|
||||
f"but got {call_args[0][0]}"
|
||||
)
|
||||
|
||||
# Verify PDF was built
|
||||
mock_doc_instance.build.assert_called_once()
|
||||
|
||||
# Verify initialize_prowler_provider was called with the provider
|
||||
mock_init_provider.assert_called_once_with(mock_provider)
|
||||
|
||||
def test_nis2_colors_are_defined(self):
|
||||
"""Verify NIS2 specific colors are defined."""
|
||||
# Check that NIS2 primary color exists
|
||||
assert COLOR_NIS2_PRIMARY is not None
|
||||
assert isinstance(COLOR_NIS2_PRIMARY, colors.Color)
|
||||
|
||||
Reference in New Issue
Block a user