From 02b58d8a315ec0fb44b62558eca3c9424664afaf Mon Sep 17 00:00:00 2001 From: Josema Camacho Date: Fri, 13 Feb 2026 13:20:38 +0100 Subject: [PATCH] fix(api): mark attack paths scan as failed when celery task fails (#10065) --- api/CHANGELOG.md | 1 + api/poetry.lock | 56 +---- .../tasks/jobs/attack_paths/db_utils.py | 27 ++- .../backend/tasks/jobs/attack_paths/scan.py | 10 +- api/src/backend/tasks/tasks.py | 20 +- .../tasks/tests/test_attack_paths_scan.py | 224 +++++++++++++++++- 6 files changed, 283 insertions(+), 55 deletions(-) diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 000148d4b2..17d4eda45b 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -18,6 +18,7 @@ All notable changes to the **Prowler API** are documented in this file. - Support CSA CCM 4.0 for the Azure provider [(#10039)](https://github.com/prowler-cloud/prowler/pull/10039) - Support CSA CCM 4.0 for the Oracle Cloud provider [(#10057)](https://github.com/prowler-cloud/prowler/pull/10057) - Support CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061) +- Attack Paths: Mark attack Paths scan as failed when Celery task fails outside job error handling [(#10065)](https://github.com/prowler-cloud/prowler/pull/10065) ### 🔐 Security diff --git a/api/poetry.lock b/api/poetry.lock index 7c8d070591..203147d55f 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. [[package]] name = "about-time" @@ -5836,46 +5836,6 @@ files = [ {file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"}, ] -[[package]] -name = "netifaces" -version = "0.11.0" -description = "Portable network interface information." -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "netifaces-0.11.0-cp27-cp27m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:eb4813b77d5df99903af4757ce980a98c4d702bbcb81f32a0b305a1537bdf0b1"}, - {file = "netifaces-0.11.0-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:5f9ca13babe4d845e400921973f6165a4c2f9f3379c7abfc7478160e25d196a4"}, - {file = "netifaces-0.11.0-cp27-cp27m-win32.whl", hash = "sha256:7dbb71ea26d304e78ccccf6faccef71bb27ea35e259fb883cfd7fd7b4f17ecb1"}, - {file = "netifaces-0.11.0-cp27-cp27m-win_amd64.whl", hash = "sha256:0f6133ac02521270d9f7c490f0c8c60638ff4aec8338efeff10a1b51506abe85"}, - {file = "netifaces-0.11.0-cp27-cp27mu-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:08e3f102a59f9eaef70948340aeb6c89bd09734e0dca0f3b82720305729f63ea"}, - {file = "netifaces-0.11.0-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c03fb2d4ef4e393f2e6ffc6376410a22a3544f164b336b3a355226653e5efd89"}, - {file = "netifaces-0.11.0-cp34-cp34m-win32.whl", hash = "sha256:73ff21559675150d31deea8f1f8d7e9a9a7e4688732a94d71327082f517fc6b4"}, - {file = "netifaces-0.11.0-cp35-cp35m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:815eafdf8b8f2e61370afc6add6194bd5a7252ae44c667e96c4c1ecf418811e4"}, - {file = "netifaces-0.11.0-cp35-cp35m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:50721858c935a76b83dd0dd1ab472cad0a3ef540a1408057624604002fcfb45b"}, - {file = "netifaces-0.11.0-cp35-cp35m-win32.whl", hash = "sha256:c9a3a47cd3aaeb71e93e681d9816c56406ed755b9442e981b07e3618fb71d2ac"}, - {file = "netifaces-0.11.0-cp36-cp36m-macosx_10_15_x86_64.whl", hash = "sha256:aab1dbfdc55086c789f0eb37affccf47b895b98d490738b81f3b2360100426be"}, - {file = "netifaces-0.11.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c37a1ca83825bc6f54dddf5277e9c65dec2f1b4d0ba44b8fd42bc30c91aa6ea1"}, - {file = "netifaces-0.11.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:28f4bf3a1361ab3ed93c5ef360c8b7d4a4ae060176a3529e72e5e4ffc4afd8b0"}, - {file = "netifaces-0.11.0-cp36-cp36m-win32.whl", hash = "sha256:2650beee182fed66617e18474b943e72e52f10a24dc8cac1db36c41ee9c041b7"}, - {file = "netifaces-0.11.0-cp36-cp36m-win_amd64.whl", hash = "sha256:cb925e1ca024d6f9b4f9b01d83215fd00fe69d095d0255ff3f64bffda74025c8"}, - {file = "netifaces-0.11.0-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:84e4d2e6973eccc52778735befc01638498781ce0e39aa2044ccfd2385c03246"}, - {file = "netifaces-0.11.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:18917fbbdcb2d4f897153c5ddbb56b31fa6dd7c3fa9608b7e3c3a663df8206b5"}, - {file = "netifaces-0.11.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:48324183af7f1bc44f5f197f3dad54a809ad1ef0c78baee2c88f16a5de02c4c9"}, - {file = "netifaces-0.11.0-cp37-cp37m-win32.whl", hash = "sha256:8f7da24eab0d4184715d96208b38d373fd15c37b0dafb74756c638bd619ba150"}, - {file = "netifaces-0.11.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2479bb4bb50968089a7c045f24d120f37026d7e802ec134c4490eae994c729b5"}, - {file = "netifaces-0.11.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:3ecb3f37c31d5d51d2a4d935cfa81c9bc956687c6f5237021b36d6fdc2815b2c"}, - {file = "netifaces-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:96c0fe9696398253f93482c84814f0e7290eee0bfec11563bd07d80d701280c3"}, - {file = "netifaces-0.11.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c92ff9ac7c2282009fe0dcb67ee3cd17978cffbe0c8f4b471c00fe4325c9b4d4"}, - {file = "netifaces-0.11.0-cp38-cp38-win32.whl", hash = "sha256:d07b01c51b0b6ceb0f09fc48ec58debd99d2c8430b09e56651addeaf5de48048"}, - {file = "netifaces-0.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:469fc61034f3daf095e02f9f1bbac07927b826c76b745207287bc594884cfd05"}, - {file = "netifaces-0.11.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:5be83986100ed1fdfa78f11ccff9e4757297735ac17391b95e17e74335c2047d"}, - {file = "netifaces-0.11.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:54ff6624eb95b8a07e79aa8817288659af174e954cca24cdb0daeeddfc03c4ff"}, - {file = "netifaces-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:841aa21110a20dc1621e3dd9f922c64ca64dd1eb213c47267a2c324d823f6c8f"}, - {file = "netifaces-0.11.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:e76c7f351e0444721e85f975ae92718e21c1f361bda946d60a214061de1f00a1"}, - {file = "netifaces-0.11.0.tar.gz", hash = "sha256:043a79146eb2907edf439899f262b3dfe41717d34124298ed281139a8b93ca32"}, -] - [[package]] name = "nltk" version = "3.9.2" @@ -6043,14 +6003,14 @@ voice-helpers = ["numpy (>=2.0.2)", "sounddevice (>=0.5.1)"] [[package]] name = "openstacksdk" -version = "4.0.1" +version = "4.2.0" description = "An SDK for building applications to work with OpenStack" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "openstacksdk-4.0.1-py3-none-any.whl", hash = "sha256:d63187a006fff7c1de1486c9e2e1073a787af402620c3c0ed0cf5291225998ac"}, - {file = "openstacksdk-4.0.1.tar.gz", hash = "sha256:19faa1d5e6a78a2c1dc06a171e65e776ba82e9df23e1d08586225dc5ade9fc63"}, + {file = "openstacksdk-4.2.0-py3-none-any.whl", hash = "sha256:238be0fa5d9899872b00787ab38e84f92fd6dc87525fde0965dadcdc12196dc6"}, + {file = "openstacksdk-4.2.0.tar.gz", hash = "sha256:5cb9450dcce8054a2caf89d8be9e55057ddfa219a954e781032241eb29280445"}, ] [package.dependencies] @@ -6061,10 +6021,10 @@ iso8601 = ">=0.1.11" jmespath = ">=0.9.0" jsonpatch = ">=1.16,<1.20 || >1.20" keystoneauth1 = ">=3.18.0" -netifaces = ">=0.10.4" os-service-types = ">=1.7.0" pbr = ">=2.0.0,<2.1.0 || >2.1.0" platformdirs = ">=3" +psutil = ">=3.2.2" PyYAML = ">=3.13" requestsexceptions = ">=1.2.0" @@ -6736,7 +6696,7 @@ microsoft-kiota-abstractions = "1.9.2" msgraph-sdk = "1.23.0" numpy = "2.0.2" oci = "2.160.3" -openstacksdk = "4.0.1" +openstacksdk = "4.2.0" pandas = "2.2.3" py-iam-expand = "0.1.0" py-ocsf-models = "0.8.1" @@ -6754,7 +6714,7 @@ tzlocal = "5.3.1" type = "git" url = "https://github.com/prowler-cloud/prowler.git" reference = "master" -resolved_reference = "3813cd498512df18a7a6139dd183f3c97fcc8848" +resolved_reference = "ceb4691c3657e7db3d178896bfc241d14f194295" [[package]] name = "psutil" diff --git a/api/src/backend/tasks/jobs/attack_paths/db_utils.py b/api/src/backend/tasks/jobs/attack_paths/db_utils.py index c9bbf5afdf..f3b53a48c9 100644 --- a/api/src/backend/tasks/jobs/attack_paths/db_utils.py +++ b/api/src/backend/tasks/jobs/attack_paths/db_utils.py @@ -86,7 +86,11 @@ def finish_attack_paths_scan( ) -> None: with rls_transaction(attack_paths_scan.tenant_id): now = datetime.now(tz=timezone.utc) - duration = int((now - attack_paths_scan.started_at).total_seconds()) + duration = ( + int((now - attack_paths_scan.started_at).total_seconds()) + if attack_paths_scan.started_at + else 0 + ) attack_paths_scan.state = state attack_paths_scan.progress = 100 @@ -144,3 +148,24 @@ def update_old_attack_paths_scan( with rls_transaction(old_attack_paths_scan.tenant_id): old_attack_paths_scan.is_graph_database_deleted = True old_attack_paths_scan.save(update_fields=["is_graph_database_deleted"]) + + +def fail_attack_paths_scan( + tenant_id: str, + scan_id: str, + error: str, +) -> None: + """ + Mark the `AttackPathsScan` row as `FAILED` unless it's already `COMPLETED` or `FAILED`. + Used as a safety net when the Celery task fails outside the job's own error handling. + """ + attack_paths_scan = retrieve_attack_paths_scan(tenant_id, scan_id) + if attack_paths_scan and attack_paths_scan.state not in ( + StateChoices.COMPLETED, + StateChoices.FAILED, + ): + finish_attack_paths_scan( + attack_paths_scan, + StateChoices.FAILED, + {"global_error": error}, + ) diff --git a/api/src/backend/tasks/jobs/attack_paths/scan.py b/api/src/backend/tasks/jobs/attack_paths/scan.py index faf6c4e176..759804b0a4 100644 --- a/api/src/backend/tasks/jobs/attack_paths/scan.py +++ b/api/src/backend/tasks/jobs/attack_paths/scan.py @@ -228,10 +228,16 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]: except Exception as e: exception_message = utils.stringify_exception(e, "Cartography failed") logger.error(exception_message) - ingestion_exceptions["global_cartography_error"] = exception_message + ingestion_exceptions["global_error"] = exception_message # Handling databases changes - graph_database.drop_database(tmp_cartography_config.neo4j_database) + try: + graph_database.drop_database(tmp_cartography_config.neo4j_database) + except Exception: + logger.exception( + f"Failed to drop temporary Neo4j database {tmp_cartography_config.neo4j_database} during cleanup" + ) + db_utils.finish_attack_paths_scan( attack_paths_scan, StateChoices.FAILED, ingestion_exceptions ) diff --git a/api/src/backend/tasks/tasks.py b/api/src/backend/tasks/tasks.py index 0dd3b0905b..cbe44ab304 100644 --- a/api/src/backend/tasks/tasks.py +++ b/api/src/backend/tasks/tasks.py @@ -10,6 +10,7 @@ from config.django.base import DJANGO_FINDINGS_BATCH_SIZE, DJANGO_TMP_OUTPUT_DIR from django_celery_beat.models import PeriodicTask from tasks.jobs.attack_paths import ( attack_paths_scan, + db_utils as attack_paths_db_utils, can_provider_run_attack_paths_scan, ) from tasks.jobs.backfill import ( @@ -359,8 +360,25 @@ def perform_scan_summary_task(tenant_id: str, scan_id: str): return aggregate_findings(tenant_id=tenant_id, scan_id=scan_id) +class AttackPathsScanRLSTask(RLSTask): + """ + RLS task that marks the `AttackPathsScan` DB row as `FAILED` when the Celery task fails. + + Covers failures that happen outside the job's own try/except (e.g. provider lookup, + SDK initialization, or Neo4j configuration errors during setup). + """ + + def on_failure(self, exc, task_id, args, kwargs, _einfo): + tenant_id = kwargs.get("tenant_id") + scan_id = kwargs.get("scan_id") + + if tenant_id and scan_id: + logger.error(f"Attack paths scan task {task_id} failed: {exc}") + attack_paths_db_utils.fail_attack_paths_scan(tenant_id, scan_id, str(exc)) + + @shared_task( - base=RLSTask, + base=AttackPathsScanRLSTask, bind=True, name="attack-paths-scan-perform", queue="attack-paths-scans", diff --git a/api/src/backend/tasks/tests/test_attack_paths_scan.py b/api/src/backend/tasks/tests/test_attack_paths_scan.py index dbedc6ae7a..dee0a2d3e1 100644 --- a/api/src/backend/tasks/tests/test_attack_paths_scan.py +++ b/api/src/backend/tasks/tests/test_attack_paths_scan.py @@ -244,9 +244,91 @@ class TestAttackPathsRun: failure_args = mock_finish.call_args[0] assert failure_args[0] is attack_paths_scan assert failure_args[1] == StateChoices.FAILED - assert failure_args[2] == { - "global_cartography_error": "Cartography failed: ingestion boom" - } + assert failure_args[2] == {"global_error": "Cartography failed: ingestion boom"} + + def test_run_failure_marks_scan_failed_even_when_drop_database_fails( + self, tenants_fixture, providers_fixture, scans_fixture + ): + tenant = tenants_fixture[0] + provider = providers_fixture[0] + provider.provider = Provider.ProviderChoices.AWS + provider.save() + scan = scans_fixture[0] + scan.provider = provider + scan.save() + + attack_paths_scan = AttackPathsScan.objects.create( + tenant_id=tenant.id, + provider=provider, + scan=scan, + state=StateChoices.SCHEDULED, + ) + + mock_session = MagicMock() + session_ctx = MagicMock() + session_ctx.__enter__.return_value = mock_session + session_ctx.__exit__.return_value = False + ingestion_fn = MagicMock(side_effect=RuntimeError("ingestion boom")) + + with ( + patch( + "tasks.jobs.attack_paths.scan.rls_transaction", + new=lambda *args, **kwargs: nullcontext(), + ), + patch( + "tasks.jobs.attack_paths.scan.initialize_prowler_provider", + return_value=MagicMock(_enabled_regions=["us-east-1"]), + ), + patch("tasks.jobs.attack_paths.scan.graph_database.get_uri"), + patch( + "tasks.jobs.attack_paths.scan.graph_database.get_database_name", + return_value="db-scan-id", + ), + patch("tasks.jobs.attack_paths.scan.graph_database.create_database"), + patch( + "tasks.jobs.attack_paths.scan.graph_database.get_session", + return_value=session_ctx, + ), + patch("tasks.jobs.attack_paths.scan.cartography_create_indexes.run"), + patch("tasks.jobs.attack_paths.scan.cartography_analysis.run"), + patch("tasks.jobs.attack_paths.scan.findings.create_findings_indexes"), + patch("tasks.jobs.attack_paths.scan.internet.analysis"), + patch("tasks.jobs.attack_paths.scan.findings.analysis"), + patch( + "tasks.jobs.attack_paths.scan.db_utils.retrieve_attack_paths_scan", + return_value=attack_paths_scan, + ), + patch("tasks.jobs.attack_paths.scan.db_utils.starting_attack_paths_scan"), + patch( + "tasks.jobs.attack_paths.scan.db_utils.update_attack_paths_scan_progress" + ), + patch( + "tasks.jobs.attack_paths.scan.db_utils.finish_attack_paths_scan" + ) as mock_finish, + patch( + "tasks.jobs.attack_paths.scan.graph_database.drop_database", + side_effect=ConnectionError("neo4j down"), + ), + patch( + "tasks.jobs.attack_paths.scan.get_cartography_ingestion_function", + return_value=ingestion_fn, + ), + patch( + "tasks.jobs.attack_paths.scan.utils.call_within_event_loop", + side_effect=lambda fn, *a, **kw: fn(*a, **kw), + ), + patch( + "tasks.jobs.attack_paths.scan.utils.stringify_exception", + return_value="Cartography failed: ingestion boom", + ), + ): + with pytest.raises(RuntimeError, match="ingestion boom"): + attack_paths_run(str(tenant.id), str(scan.id), "task-789") + + failure_args = mock_finish.call_args[0] + assert failure_args[0] is attack_paths_scan + assert failure_args[1] == StateChoices.FAILED + assert failure_args[2] == {"global_error": "Cartography failed: ingestion boom"} def test_run_returns_early_for_unsupported_provider(self, tenants_fixture): tenant = tenants_fixture[0] @@ -291,6 +373,142 @@ class TestAttackPathsRun: mock_retrieve.assert_called_once_with(str(tenant.id), str(scan.id)) +@pytest.mark.django_db +class TestFailAttackPathsScan: + def test_marks_executing_scan_as_failed( + self, tenants_fixture, providers_fixture, scans_fixture + ): + from tasks.jobs.attack_paths.db_utils import ( + fail_attack_paths_scan, + ) + + tenant = tenants_fixture[0] + provider = providers_fixture[0] + provider.provider = Provider.ProviderChoices.AWS + provider.save() + scan = scans_fixture[0] + scan.provider = provider + scan.save() + + attack_paths_scan = AttackPathsScan.objects.create( + tenant_id=tenant.id, + provider=provider, + scan=scan, + state=StateChoices.EXECUTING, + ) + + with ( + patch( + "tasks.jobs.attack_paths.db_utils.retrieve_attack_paths_scan", + return_value=attack_paths_scan, + ) as mock_retrieve, + patch( + "tasks.jobs.attack_paths.db_utils.finish_attack_paths_scan" + ) as mock_finish, + ): + fail_attack_paths_scan(str(tenant.id), str(scan.id), "setup exploded") + + mock_retrieve.assert_called_once_with(str(tenant.id), str(scan.id)) + mock_finish.assert_called_once_with( + attack_paths_scan, + StateChoices.FAILED, + {"global_error": "setup exploded"}, + ) + + def test_skips_already_failed_scan( + self, tenants_fixture, providers_fixture, scans_fixture + ): + from tasks.jobs.attack_paths.db_utils import ( + fail_attack_paths_scan, + ) + + tenant = tenants_fixture[0] + provider = providers_fixture[0] + provider.provider = Provider.ProviderChoices.AWS + provider.save() + scan = scans_fixture[0] + scan.provider = provider + scan.save() + + attack_paths_scan = AttackPathsScan.objects.create( + tenant_id=tenant.id, + provider=provider, + scan=scan, + state=StateChoices.FAILED, + ) + + with ( + patch( + "tasks.jobs.attack_paths.db_utils.retrieve_attack_paths_scan", + return_value=attack_paths_scan, + ), + patch( + "tasks.jobs.attack_paths.db_utils.finish_attack_paths_scan" + ) as mock_finish, + ): + fail_attack_paths_scan(str(tenant.id), str(scan.id), "setup exploded") + + mock_finish.assert_not_called() + + def test_skips_when_no_scan_found(self, tenants_fixture): + from tasks.jobs.attack_paths.db_utils import ( + fail_attack_paths_scan, + ) + + tenant = tenants_fixture[0] + + with ( + patch( + "tasks.jobs.attack_paths.db_utils.retrieve_attack_paths_scan", + return_value=None, + ), + patch( + "tasks.jobs.attack_paths.db_utils.finish_attack_paths_scan" + ) as mock_finish, + ): + fail_attack_paths_scan(str(tenant.id), "nonexistent", "setup exploded") + + mock_finish.assert_not_called() + + +class TestAttackPathsScanRLSTaskOnFailure: + def test_on_failure_delegates_to_fail_attack_paths_scan(self): + from tasks.tasks import AttackPathsScanRLSTask + + task = AttackPathsScanRLSTask() + + with patch( + "tasks.tasks.attack_paths_db_utils.fail_attack_paths_scan" + ) as mock_fail: + task.on_failure( + exc=RuntimeError("boom"), + task_id="task-abc", + args=(), + kwargs={"tenant_id": "t-1", "scan_id": "s-1"}, + _einfo=None, + ) + + mock_fail.assert_called_once_with("t-1", "s-1", "boom") + + def test_on_failure_skips_when_missing_kwargs(self): + from tasks.tasks import AttackPathsScanRLSTask + + task = AttackPathsScanRLSTask() + + with patch( + "tasks.tasks.attack_paths_db_utils.fail_attack_paths_scan" + ) as mock_fail: + task.on_failure( + exc=RuntimeError("boom"), + task_id="task-abc", + args=(), + kwargs={}, + _einfo=None, + ) + + mock_fail.assert_not_called() + + @pytest.mark.django_db class TestAttackPathsFindingsHelpers: def test_create_findings_indexes_executes_all_statements(self):