From a652e28b4a6aede7b36190c37fefd7fdcdb6cfca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Mart=C3=ADn?= Date: Tue, 2 Jun 2026 11:37:05 +0200 Subject: [PATCH] fix(api): clean up scan tmp output failure to avoid disk fill (#11421) Co-authored-by: Pepe Fagoaga --- api/CHANGELOG.md | 8 ++ api/src/backend/api/tests/test_views.py | 164 ++++++++++++++++++++-- api/src/backend/api/v1/views.py | 34 +++-- api/src/backend/tasks/tasks.py | 30 +++- api/src/backend/tasks/tests/test_tasks.py | 34 +++++ prowler/CHANGELOG.md | 8 ++ prowler/lib/outputs/ocsf/ocsf.py | 8 ++ tests/lib/outputs/ocsf/ocsf_test.py | 32 +++++ 8 files changed, 292 insertions(+), 26 deletions(-) diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index bbc675c8d2..fbc7f178ad 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to the **Prowler API** are documented in this file. +## [1.30.1] (Prowler v5.29.1) + +### 🐞 Fixed + +- Clean up the scan tmp output directory when `scan-report` fails so partial files do not accumulate and fill the worker disk (`No space left on device`) [(#11421)](https://github.com/prowler-cloud/prowler/pull/11421) + +--- + ## [1.30.0] (Prowler v5.29.0) ### 🔄 Changed diff --git a/api/src/backend/api/tests/test_views.py b/api/src/backend/api/tests/test_views.py index ebd9128407..9e0b4362a3 100644 --- a/api/src/backend/api/tests/test_views.py +++ b/api/src/backend/api/tests/test_views.py @@ -3856,16 +3856,20 @@ class TestScanViewSet: scan.output_location = "dummy" scan.save() - dummy_task = Task.objects.create(tenant_id=scan.tenant_id) - dummy_task.id = "dummy-task-id" - dummy_task_data = {"id": dummy_task.id, "state": StateChoices.EXECUTING} + task_result = TaskResult.objects.create( + task_id=str(uuid4()), + task_name="scan-report", + task_kwargs={"scan_id": str(scan.id)}, + ) + task = Task.objects.create( + tenant_id=scan.tenant_id, + task_runner_task=task_result, + ) + dummy_task_data = {"id": str(task.id), "state": StateChoices.EXECUTING} - with ( - patch("api.v1.views.Task.objects.get", return_value=dummy_task), - patch( - "api.v1.views.TaskSerializer", - return_value=type("DummySerializer", (), {"data": dummy_task_data}), - ), + with patch( + "api.v1.views.TaskSerializer", + return_value=type("DummySerializer", (), {"data": dummy_task_data}), ): url = reverse("scan-report", kwargs={"pk": scan.id}) response = authenticated_client.get(url) @@ -4186,6 +4190,88 @@ class TestScanViewSet: assert resp.status_code == status.HTTP_302_FOUND assert resp["Location"] == presigned_url + def test_compliance_s3_returns_latest_match( + self, authenticated_client, scans_fixture, monkeypatch + ): + """When several files match, the most recently modified one is served.""" + scan = scans_fixture[0] + bucket = "bucket" + scan.output_location = f"s3://{bucket}/path/scan.zip" + scan.state = StateChoices.COMPLETED + scan.save() + + monkeypatch.setattr( + "api.v1.views.env", + type("env", (), {"str": lambda self, *args, **kwargs: "test-bucket"})(), + ) + + old_key = "path/compliance/prowler-output-aws-20240101000000_cis_1.4_aws.csv" + latest_key = "path/compliance/prowler-output-aws-20240202000000_cis_1.4_aws.csv" + + class FakeS3Client: + def list_objects_v2(self, Bucket, Prefix): + return { + "Contents": [ + { + "Key": old_key, + "LastModified": datetime(2024, 1, 1, tzinfo=timezone.utc), + }, + { + "Key": latest_key, + "LastModified": datetime(2024, 2, 2, tzinfo=timezone.utc), + }, + ] + } + + def generate_presigned_url(self, ClientMethod, Params, ExpiresIn): + assert Params["Key"] == latest_key + return "https://test-bucket.s3.amazonaws.com/latest" + + monkeypatch.setattr("api.v1.views.get_s3_client", lambda: FakeS3Client()) + + url = reverse("scan-compliance", kwargs={"pk": scan.id, "name": "cis_1.4_aws"}) + resp = authenticated_client.get(url) + assert resp.status_code == status.HTTP_302_FOUND + assert resp["Location"].endswith("/latest") + + def test_compliance_local_returns_latest_match( + self, authenticated_client, scans_fixture, monkeypatch + ): + """The local branch serves the most recently modified matching file.""" + scan = scans_fixture[0] + scan.state = StateChoices.COMPLETED + + with tempfile.TemporaryDirectory() as tmp: + comp_dir = Path(tmp) / "reports" / "compliance" + comp_dir.mkdir(parents=True, exist_ok=True) + + old_file = comp_dir / "prowler-output-aws-20240101000000_cis_1.4_aws.csv" + old_file.write_bytes(b"old") + latest_file = comp_dir / "prowler-output-aws-20240202000000_cis_1.4_aws.csv" + latest_file.write_bytes(b"latest") + # Make `latest_file` newer regardless of creation order. + os.utime(old_file, (1_700_000_000, 1_700_000_000)) + os.utime(latest_file, (1_700_000_100, 1_700_000_100)) + + scan.output_location = str(Path(tmp) / "reports" / "scan.zip") + scan.save() + + monkeypatch.setattr( + glob, + "glob", + lambda p: [str(old_file), str(latest_file)], + ) + + url = reverse( + "scan-compliance", kwargs={"pk": scan.id, "name": "cis_1.4_aws"} + ) + resp = authenticated_client.get(url) + assert resp.status_code == status.HTTP_200_OK + assert resp.content == b"latest" + assert resp["Content-Disposition"].endswith( + f'filename="{latest_file.name}"' + ) + def test_compliance_s3_not_found( self, authenticated_client, scans_fixture, monkeypatch ): @@ -4294,18 +4380,24 @@ class TestScanViewSet: assert cd.startswith('attachment; filename="') assert cd.endswith(f'filename="{fname.name}"') - @patch("api.v1.views.Task.objects.get") @patch("api.v1.views.TaskSerializer") def test__get_task_status_returns_none_if_task_not_executing( - self, mock_task_serializer, mock_task_get, authenticated_client, scans_fixture + self, mock_task_serializer, authenticated_client, scans_fixture ): scan = scans_fixture[0] scan.state = StateChoices.COMPLETED scan.output_location = "dummy" scan.save() - task = Task.objects.create(tenant_id=scan.tenant_id) - mock_task_get.return_value = task + task_result = TaskResult.objects.create( + task_id=str(uuid4()), + task_name="scan-report", + task_kwargs={"scan_id": str(scan.id)}, + ) + task = Task.objects.create( + tenant_id=scan.tenant_id, + task_runner_task=task_result, + ) mock_task_serializer.return_value.data = { "id": str(task.id), "state": StateChoices.COMPLETED, @@ -4326,6 +4418,7 @@ class TestScanViewSet: scan.save() task_result = TaskResult.objects.create( + task_id=str(uuid4()), task_name="scan-report", task_kwargs={"scan_id": str(scan.id)}, ) @@ -4346,6 +4439,51 @@ class TestScanViewSet: assert response.status_code == status.HTTP_202_ACCEPTED assert response.data["id"] == str(task.id) + @patch("api.v1.views.TaskSerializer") + def test__get_task_status_returns_latest_task( + self, mock_task_serializer, authenticated_client, scans_fixture + ): + """With several scan-report tasks for the scan, the most recent is used.""" + scan = scans_fixture[0] + scan.state = StateChoices.COMPLETED + scan.output_location = "dummy" + scan.save() + + old_task = Task.objects.create( + tenant_id=scan.tenant_id, + task_runner_task=TaskResult.objects.create( + task_id=str(uuid4()), + task_name="scan-report", + task_kwargs={"scan_id": str(scan.id)}, + ), + ) + new_task = Task.objects.create( + tenant_id=scan.tenant_id, + task_runner_task=TaskResult.objects.create( + task_id=str(uuid4()), + task_name="scan-report", + task_kwargs={"scan_id": str(scan.id)}, + ), + ) + # `inserted_at` is `auto_now_add`, and within the test transaction the DB + # `now()` is constant, so force distinct timestamps to make order_by stable. + base = datetime(2024, 1, 1, tzinfo=timezone.utc) + Task.objects.filter(pk=old_task.pk).update(inserted_at=base) + Task.objects.filter(pk=new_task.pk).update( + inserted_at=base + timedelta(hours=1) + ) + + mock_task_serializer.side_effect = lambda instance, *a, **k: SimpleNamespace( + data={"id": str(instance.id), "state": StateChoices.EXECUTING} + ) + + url = reverse("scan-report", kwargs={"pk": scan.id}) + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_202_ACCEPTED + assert str(new_task.id) in response["Content-Location"] + assert str(old_task.id) not in response["Content-Location"] + @patch("api.v1.views.get_s3_client") @patch("api.v1.views.sentry_sdk.capture_exception") def test_compliance_list_objects_client_error( diff --git a/api/src/backend/api/v1/views.py b/api/src/backend/api/v1/views.py index 067a0403a3..bf5cdcc54b 100644 --- a/api/src/backend/api/v1/views.py +++ b/api/src/backend/api/v1/views.py @@ -2059,12 +2059,17 @@ class ScanViewSet(BaseRLSViewSet): if scan_instance.state == StateChoices.EXECUTING and scan_instance.task: task = scan_instance.task else: - try: - task = Task.objects.get( + # A scan can have several `scan-report` tasks (e.g. re-runs); take the + # most recent one. `.first()` also avoids `MultipleObjectsReturned`. + task = ( + Task.objects.filter( task_runner_task__task_name="scan-report", task_runner_task__task_kwargs__contains=str(scan_instance.id), ) - except Task.DoesNotExist: + .order_by("-inserted_at") + .first() + ) + if task is None: return None self.response_serializer_class = TaskSerializer @@ -2139,27 +2144,32 @@ class ScanViewSet(BaseRLSViewSet): status=status.HTTP_502_BAD_GATEWAY, ) contents = resp.get("Contents", []) - keys = [] + matches = [] for obj in contents: key = obj["Key"] key_basename = os.path.basename(key) if any(ch in suffix for ch in ("*", "?", "[")): if fnmatch.fnmatch(key_basename, suffix): - keys.append(key) + matches.append(obj) elif key_basename == suffix: - keys.append(key) + matches.append(obj) elif key.endswith(suffix): # Backward compatibility if suffix already includes directories - keys.append(key) - if not keys: + matches.append(obj) + if not matches: return Response( { "detail": f"No compliance file found for name '{os.path.splitext(suffix)[0]}'." }, status=status.HTTP_404_NOT_FOUND, ) - # path_pattern here is prefix, but in compliance we build correct suffix check before - key = keys[0] + # Return the most recently modified match (latest report) when + # several files share the prefix/suffix. `list_objects_v2` always + # returns `LastModified`; the fallback keeps ordering deterministic + # if it is ever absent. + key = max(matches, key=lambda o: (o.get("LastModified", ""), o["Key"]))[ + "Key" + ] else: # path_pattern is exact key; HEAD before presigning to preserve the 404 contract. key = path_pattern @@ -2209,7 +2219,9 @@ class ScanViewSet(BaseRLSViewSet): }, status=status.HTTP_404_NOT_FOUND, ) - filepath = files[0] + # Return the most recently modified match (latest report) when the + # pattern resolves to several files. + filepath = max(files, key=os.path.getmtime) with open(filepath, "rb") as f: content = f.read() filename = os.path.basename(filepath) diff --git a/api/src/backend/tasks/tasks.py b/api/src/backend/tasks/tasks.py index 0fda2999a9..b35b68893d 100644 --- a/api/src/backend/tasks/tasks.py +++ b/api/src/backend/tasks/tasks.py @@ -467,8 +467,31 @@ def delete_tenant_task(tenant_id: str): return delete_tenant(pk=tenant_id) +def _scan_tmp_output_directory(tenant_id: str, scan_id: str) -> Path: + """Root tmp output directory for a scan ({tmp}/{tenant_id}/{scan_id}).""" + return Path(DJANGO_TMP_OUTPUT_DIRECTORY) / str(tenant_id) / str(scan_id) + + +class ScanReportRLSTask(RLSTask): + """ + RLS task that removes the scan's tmp output directory when the task fails. + + Covers failures both inside and outside the task body (e.g. ENOSPC mid-write, + or setup errors) so partial artifacts do not accumulate on the worker disk. + """ + + def on_failure(self, exc, task_id, args, kwargs, _einfo): # noqa: ARG002 + del args # Required by Celery's Task.on_failure signature; not used. + tenant_id = kwargs.get("tenant_id") + scan_id = kwargs.get("scan_id") + + if tenant_id and scan_id: + logger.error(f"Scan report task {task_id} failed: {exc}") + rmtree(_scan_tmp_output_directory(tenant_id, scan_id), ignore_errors=True) + + @shared_task( - base=RLSTask, + base=ScanReportRLSTask, name="scan-report", queue="scan-reports", ) @@ -518,6 +541,9 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str): out_dir, comp_dir = _generate_output_directory( DJANGO_TMP_OUTPUT_DIRECTORY, provider_uid, tenant_id, scan_id ) + # Removed on success here and on failure by ScanReportRLSTask.on_failure, + # so partial artifacts do not accumulate and fill the disk (ENOSPC). + scan_tmp_dir = _scan_tmp_output_directory(tenant_id, scan_id) def get_writer(writer_map, name, factory, is_last): """ @@ -666,7 +692,7 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str): # TODO: We need to create a new periodic task to delete the output files # This task shouldn't be responsible for deleting the output files try: - rmtree(Path(compressed).parent, ignore_errors=True) + rmtree(scan_tmp_dir, ignore_errors=True) except Exception as e: logger.error(f"Error deleting output files: {e}") final_location, did_upload = upload_uri, True diff --git a/api/src/backend/tasks/tests/test_tasks.py b/api/src/backend/tasks/tests/test_tasks.py index f15bdf7192..7a52c27323 100644 --- a/api/src/backend/tasks/tests/test_tasks.py +++ b/api/src/backend/tasks/tests/test_tasks.py @@ -15,8 +15,10 @@ from tasks.jobs.lighthouse_providers import ( from tasks.tasks import ( DJANGO_TMP_OUTPUT_DIRECTORY, STALE_TMP_OUTPUT_MAX_AGE_HOURS, + ScanReportRLSTask, _cleanup_orphan_scheduled_scans, _perform_scan_complete_tasks, + _scan_tmp_output_directory, check_integrations_task, check_lighthouse_provider_connection_task, generate_outputs_task, @@ -771,6 +773,38 @@ class TestGenerateOutputs: mock_s3_task.assert_called_once() +class TestScanReportRLSTaskOnFailure: + def test_on_failure_removes_scan_tmp_directory(self): + task = ScanReportRLSTask() + + with patch("tasks.tasks.rmtree") as mock_rmtree: + task.on_failure( + exc=OSError("No space left on device"), + task_id="task-abc", + args=(), + kwargs={"tenant_id": "t-1", "scan_id": "s-1"}, + _einfo=None, + ) + + mock_rmtree.assert_called_once_with( + _scan_tmp_output_directory("t-1", "s-1"), ignore_errors=True + ) + + def test_on_failure_skips_when_missing_kwargs(self): + task = ScanReportRLSTask() + + with patch("tasks.tasks.rmtree") as mock_rmtree: + task.on_failure( + exc=OSError("No space left on device"), + task_id="task-abc", + args=(), + kwargs={}, + _einfo=None, + ) + + mock_rmtree.assert_not_called() + + class TestScanCompleteTasks: @patch("tasks.tasks.aggregate_attack_surface_task.apply_async") @patch("tasks.tasks.chain") diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index ea1707c71b..cbf3e84737 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to the **Prowler SDK** are documented in this file. +## [5.29.1] (Prowler v5.29.1) + +### 🐞 Fixed + +- OCSF output writer now re-raises I/O errors (e.g. `ENOSPC`) instead of logging them per finding and leaving a truncated file [(#11421)](https://github.com/prowler-cloud/prowler/pull/11421) + +--- + ## [5.29.0] (Prowler v5.29.0) ### 🚀 Added diff --git a/prowler/lib/outputs/ocsf/ocsf.py b/prowler/lib/outputs/ocsf/ocsf.py index 731d029284..53f27d0e1b 100644 --- a/prowler/lib/outputs/ocsf/ocsf.py +++ b/prowler/lib/outputs/ocsf/ocsf.py @@ -227,6 +227,10 @@ class OCSF(Output): json_output = finding.json(exclude_none=True, indent=4) self._file_descriptor.write(json_output) self._file_descriptor.write(",") + except OSError: + # I/O errors (e.g. ENOSPC) are not recoverable per finding: + # fail fast instead of logging once per finding. + raise except Exception as error: logger.error( f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" @@ -239,6 +243,10 @@ class OCSF(Output): self._file_descriptor.truncate() self._file_descriptor.write("]") self._file_descriptor.close() + except OSError: + # Propagate unrecoverable I/O errors (e.g. ENOSPC) so the caller can + # fail fast instead of producing a corrupt output file. + raise except Exception as error: logger.error( f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" diff --git a/tests/lib/outputs/ocsf/ocsf_test.py b/tests/lib/outputs/ocsf/ocsf_test.py index 16ff1ce317..f449fd49f7 100644 --- a/tests/lib/outputs/ocsf/ocsf_test.py +++ b/tests/lib/outputs/ocsf/ocsf_test.py @@ -2,8 +2,10 @@ import json from datetime import datetime, timezone from io import StringIO from typing import Optional +from unittest.mock import MagicMock from uuid import UUID +import pytest import requests from freezegun import freeze_time from mock import patch @@ -300,6 +302,36 @@ class TestOCSF: def test_batch_write_data_to_file_without_findings(self): assert not OCSF([])._file_descriptor + def test_batch_write_data_to_file_propagates_oserror(self): + """An I/O error (e.g. ENOSPC) while writing a finding must propagate + instead of being swallowed, so the caller can fail fast.""" + findings = [ + generate_finding_output( + status="FAIL", + severity="low", + muted=False, + region=AWS_REGION_EU_WEST_1, + timestamp=datetime.now(), + resource_details="resource_details", + resource_name="resource_name", + resource_uid="resource-id", + status_extended="status extended", + ) + ] + + output = OCSF(findings) + mock_file = MagicMock() + mock_file.closed = False + # Non-zero so the "[" prelude is skipped and the failure happens on the + # per-finding write, the exact path that hit ENOSPC in production. + mock_file.tell.return_value = 1 + mock_file.write.side_effect = OSError(28, "No space left on device") + output._file_descriptor = mock_file + + with pytest.raises(OSError) as excinfo: + output.batch_write_data_to_file() + assert excinfo.value.errno == 28 + def test_finding_output_cloud_pass_low_muted(self): finding_output = generate_finding_output( status="PASS",