fix(api): mark attack paths scan as failed when celery task fails (#10065)

This commit is contained in:
Josema Camacho
2026-02-13 13:20:38 +01:00
committed by GitHub
parent 3defbcd386
commit 02b58d8a31
6 changed files with 283 additions and 55 deletions

View File

@@ -18,6 +18,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Support CSA CCM 4.0 for the Azure provider [(#10039)](https://github.com/prowler-cloud/prowler/pull/10039)
- Support CSA CCM 4.0 for the Oracle Cloud provider [(#10057)](https://github.com/prowler-cloud/prowler/pull/10057)
- Support CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061)
- Attack Paths: Mark attack Paths scan as failed when Celery task fails outside job error handling [(#10065)](https://github.com/prowler-cloud/prowler/pull/10065)
### 🔐 Security

56
api/poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -5836,46 +5836,6 @@ files = [
{file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"},
]
[[package]]
name = "netifaces"
version = "0.11.0"
description = "Portable network interface information."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "netifaces-0.11.0-cp27-cp27m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:eb4813b77d5df99903af4757ce980a98c4d702bbcb81f32a0b305a1537bdf0b1"},
{file = "netifaces-0.11.0-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:5f9ca13babe4d845e400921973f6165a4c2f9f3379c7abfc7478160e25d196a4"},
{file = "netifaces-0.11.0-cp27-cp27m-win32.whl", hash = "sha256:7dbb71ea26d304e78ccccf6faccef71bb27ea35e259fb883cfd7fd7b4f17ecb1"},
{file = "netifaces-0.11.0-cp27-cp27m-win_amd64.whl", hash = "sha256:0f6133ac02521270d9f7c490f0c8c60638ff4aec8338efeff10a1b51506abe85"},
{file = "netifaces-0.11.0-cp27-cp27mu-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:08e3f102a59f9eaef70948340aeb6c89bd09734e0dca0f3b82720305729f63ea"},
{file = "netifaces-0.11.0-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c03fb2d4ef4e393f2e6ffc6376410a22a3544f164b336b3a355226653e5efd89"},
{file = "netifaces-0.11.0-cp34-cp34m-win32.whl", hash = "sha256:73ff21559675150d31deea8f1f8d7e9a9a7e4688732a94d71327082f517fc6b4"},
{file = "netifaces-0.11.0-cp35-cp35m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:815eafdf8b8f2e61370afc6add6194bd5a7252ae44c667e96c4c1ecf418811e4"},
{file = "netifaces-0.11.0-cp35-cp35m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:50721858c935a76b83dd0dd1ab472cad0a3ef540a1408057624604002fcfb45b"},
{file = "netifaces-0.11.0-cp35-cp35m-win32.whl", hash = "sha256:c9a3a47cd3aaeb71e93e681d9816c56406ed755b9442e981b07e3618fb71d2ac"},
{file = "netifaces-0.11.0-cp36-cp36m-macosx_10_15_x86_64.whl", hash = "sha256:aab1dbfdc55086c789f0eb37affccf47b895b98d490738b81f3b2360100426be"},
{file = "netifaces-0.11.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c37a1ca83825bc6f54dddf5277e9c65dec2f1b4d0ba44b8fd42bc30c91aa6ea1"},
{file = "netifaces-0.11.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:28f4bf3a1361ab3ed93c5ef360c8b7d4a4ae060176a3529e72e5e4ffc4afd8b0"},
{file = "netifaces-0.11.0-cp36-cp36m-win32.whl", hash = "sha256:2650beee182fed66617e18474b943e72e52f10a24dc8cac1db36c41ee9c041b7"},
{file = "netifaces-0.11.0-cp36-cp36m-win_amd64.whl", hash = "sha256:cb925e1ca024d6f9b4f9b01d83215fd00fe69d095d0255ff3f64bffda74025c8"},
{file = "netifaces-0.11.0-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:84e4d2e6973eccc52778735befc01638498781ce0e39aa2044ccfd2385c03246"},
{file = "netifaces-0.11.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:18917fbbdcb2d4f897153c5ddbb56b31fa6dd7c3fa9608b7e3c3a663df8206b5"},
{file = "netifaces-0.11.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:48324183af7f1bc44f5f197f3dad54a809ad1ef0c78baee2c88f16a5de02c4c9"},
{file = "netifaces-0.11.0-cp37-cp37m-win32.whl", hash = "sha256:8f7da24eab0d4184715d96208b38d373fd15c37b0dafb74756c638bd619ba150"},
{file = "netifaces-0.11.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2479bb4bb50968089a7c045f24d120f37026d7e802ec134c4490eae994c729b5"},
{file = "netifaces-0.11.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:3ecb3f37c31d5d51d2a4d935cfa81c9bc956687c6f5237021b36d6fdc2815b2c"},
{file = "netifaces-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:96c0fe9696398253f93482c84814f0e7290eee0bfec11563bd07d80d701280c3"},
{file = "netifaces-0.11.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c92ff9ac7c2282009fe0dcb67ee3cd17978cffbe0c8f4b471c00fe4325c9b4d4"},
{file = "netifaces-0.11.0-cp38-cp38-win32.whl", hash = "sha256:d07b01c51b0b6ceb0f09fc48ec58debd99d2c8430b09e56651addeaf5de48048"},
{file = "netifaces-0.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:469fc61034f3daf095e02f9f1bbac07927b826c76b745207287bc594884cfd05"},
{file = "netifaces-0.11.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:5be83986100ed1fdfa78f11ccff9e4757297735ac17391b95e17e74335c2047d"},
{file = "netifaces-0.11.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:54ff6624eb95b8a07e79aa8817288659af174e954cca24cdb0daeeddfc03c4ff"},
{file = "netifaces-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:841aa21110a20dc1621e3dd9f922c64ca64dd1eb213c47267a2c324d823f6c8f"},
{file = "netifaces-0.11.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:e76c7f351e0444721e85f975ae92718e21c1f361bda946d60a214061de1f00a1"},
{file = "netifaces-0.11.0.tar.gz", hash = "sha256:043a79146eb2907edf439899f262b3dfe41717d34124298ed281139a8b93ca32"},
]
[[package]]
name = "nltk"
version = "3.9.2"
@@ -6043,14 +6003,14 @@ voice-helpers = ["numpy (>=2.0.2)", "sounddevice (>=0.5.1)"]
[[package]]
name = "openstacksdk"
version = "4.0.1"
version = "4.2.0"
description = "An SDK for building applications to work with OpenStack"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "openstacksdk-4.0.1-py3-none-any.whl", hash = "sha256:d63187a006fff7c1de1486c9e2e1073a787af402620c3c0ed0cf5291225998ac"},
{file = "openstacksdk-4.0.1.tar.gz", hash = "sha256:19faa1d5e6a78a2c1dc06a171e65e776ba82e9df23e1d08586225dc5ade9fc63"},
{file = "openstacksdk-4.2.0-py3-none-any.whl", hash = "sha256:238be0fa5d9899872b00787ab38e84f92fd6dc87525fde0965dadcdc12196dc6"},
{file = "openstacksdk-4.2.0.tar.gz", hash = "sha256:5cb9450dcce8054a2caf89d8be9e55057ddfa219a954e781032241eb29280445"},
]
[package.dependencies]
@@ -6061,10 +6021,10 @@ iso8601 = ">=0.1.11"
jmespath = ">=0.9.0"
jsonpatch = ">=1.16,<1.20 || >1.20"
keystoneauth1 = ">=3.18.0"
netifaces = ">=0.10.4"
os-service-types = ">=1.7.0"
pbr = ">=2.0.0,<2.1.0 || >2.1.0"
platformdirs = ">=3"
psutil = ">=3.2.2"
PyYAML = ">=3.13"
requestsexceptions = ">=1.2.0"
@@ -6736,7 +6696,7 @@ microsoft-kiota-abstractions = "1.9.2"
msgraph-sdk = "1.23.0"
numpy = "2.0.2"
oci = "2.160.3"
openstacksdk = "4.0.1"
openstacksdk = "4.2.0"
pandas = "2.2.3"
py-iam-expand = "0.1.0"
py-ocsf-models = "0.8.1"
@@ -6754,7 +6714,7 @@ tzlocal = "5.3.1"
type = "git"
url = "https://github.com/prowler-cloud/prowler.git"
reference = "master"
resolved_reference = "3813cd498512df18a7a6139dd183f3c97fcc8848"
resolved_reference = "ceb4691c3657e7db3d178896bfc241d14f194295"
[[package]]
name = "psutil"

View File

@@ -86,7 +86,11 @@ def finish_attack_paths_scan(
) -> None:
with rls_transaction(attack_paths_scan.tenant_id):
now = datetime.now(tz=timezone.utc)
duration = int((now - attack_paths_scan.started_at).total_seconds())
duration = (
int((now - attack_paths_scan.started_at).total_seconds())
if attack_paths_scan.started_at
else 0
)
attack_paths_scan.state = state
attack_paths_scan.progress = 100
@@ -144,3 +148,24 @@ def update_old_attack_paths_scan(
with rls_transaction(old_attack_paths_scan.tenant_id):
old_attack_paths_scan.is_graph_database_deleted = True
old_attack_paths_scan.save(update_fields=["is_graph_database_deleted"])
def fail_attack_paths_scan(
tenant_id: str,
scan_id: str,
error: str,
) -> None:
"""
Mark the `AttackPathsScan` row as `FAILED` unless it's already `COMPLETED` or `FAILED`.
Used as a safety net when the Celery task fails outside the job's own error handling.
"""
attack_paths_scan = retrieve_attack_paths_scan(tenant_id, scan_id)
if attack_paths_scan and attack_paths_scan.state not in (
StateChoices.COMPLETED,
StateChoices.FAILED,
):
finish_attack_paths_scan(
attack_paths_scan,
StateChoices.FAILED,
{"global_error": error},
)

View File

@@ -228,10 +228,16 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]:
except Exception as e:
exception_message = utils.stringify_exception(e, "Cartography failed")
logger.error(exception_message)
ingestion_exceptions["global_cartography_error"] = exception_message
ingestion_exceptions["global_error"] = exception_message
# Handling databases changes
try:
graph_database.drop_database(tmp_cartography_config.neo4j_database)
except Exception:
logger.exception(
f"Failed to drop temporary Neo4j database {tmp_cartography_config.neo4j_database} during cleanup"
)
db_utils.finish_attack_paths_scan(
attack_paths_scan, StateChoices.FAILED, ingestion_exceptions
)

View File

@@ -10,6 +10,7 @@ from config.django.base import DJANGO_FINDINGS_BATCH_SIZE, DJANGO_TMP_OUTPUT_DIR
from django_celery_beat.models import PeriodicTask
from tasks.jobs.attack_paths import (
attack_paths_scan,
db_utils as attack_paths_db_utils,
can_provider_run_attack_paths_scan,
)
from tasks.jobs.backfill import (
@@ -359,8 +360,25 @@ def perform_scan_summary_task(tenant_id: str, scan_id: str):
return aggregate_findings(tenant_id=tenant_id, scan_id=scan_id)
class AttackPathsScanRLSTask(RLSTask):
"""
RLS task that marks the `AttackPathsScan` DB row as `FAILED` when the Celery task fails.
Covers failures that happen outside the job's own try/except (e.g. provider lookup,
SDK initialization, or Neo4j configuration errors during setup).
"""
def on_failure(self, exc, task_id, args, kwargs, _einfo):
tenant_id = kwargs.get("tenant_id")
scan_id = kwargs.get("scan_id")
if tenant_id and scan_id:
logger.error(f"Attack paths scan task {task_id} failed: {exc}")
attack_paths_db_utils.fail_attack_paths_scan(tenant_id, scan_id, str(exc))
@shared_task(
base=RLSTask,
base=AttackPathsScanRLSTask,
bind=True,
name="attack-paths-scan-perform",
queue="attack-paths-scans",

View File

@@ -244,9 +244,91 @@ class TestAttackPathsRun:
failure_args = mock_finish.call_args[0]
assert failure_args[0] is attack_paths_scan
assert failure_args[1] == StateChoices.FAILED
assert failure_args[2] == {
"global_cartography_error": "Cartography failed: ingestion boom"
}
assert failure_args[2] == {"global_error": "Cartography failed: ingestion boom"}
def test_run_failure_marks_scan_failed_even_when_drop_database_fails(
self, tenants_fixture, providers_fixture, scans_fixture
):
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
scan = scans_fixture[0]
scan.provider = provider
scan.save()
attack_paths_scan = AttackPathsScan.objects.create(
tenant_id=tenant.id,
provider=provider,
scan=scan,
state=StateChoices.SCHEDULED,
)
mock_session = MagicMock()
session_ctx = MagicMock()
session_ctx.__enter__.return_value = mock_session
session_ctx.__exit__.return_value = False
ingestion_fn = MagicMock(side_effect=RuntimeError("ingestion boom"))
with (
patch(
"tasks.jobs.attack_paths.scan.rls_transaction",
new=lambda *args, **kwargs: nullcontext(),
),
patch(
"tasks.jobs.attack_paths.scan.initialize_prowler_provider",
return_value=MagicMock(_enabled_regions=["us-east-1"]),
),
patch("tasks.jobs.attack_paths.scan.graph_database.get_uri"),
patch(
"tasks.jobs.attack_paths.scan.graph_database.get_database_name",
return_value="db-scan-id",
),
patch("tasks.jobs.attack_paths.scan.graph_database.create_database"),
patch(
"tasks.jobs.attack_paths.scan.graph_database.get_session",
return_value=session_ctx,
),
patch("tasks.jobs.attack_paths.scan.cartography_create_indexes.run"),
patch("tasks.jobs.attack_paths.scan.cartography_analysis.run"),
patch("tasks.jobs.attack_paths.scan.findings.create_findings_indexes"),
patch("tasks.jobs.attack_paths.scan.internet.analysis"),
patch("tasks.jobs.attack_paths.scan.findings.analysis"),
patch(
"tasks.jobs.attack_paths.scan.db_utils.retrieve_attack_paths_scan",
return_value=attack_paths_scan,
),
patch("tasks.jobs.attack_paths.scan.db_utils.starting_attack_paths_scan"),
patch(
"tasks.jobs.attack_paths.scan.db_utils.update_attack_paths_scan_progress"
),
patch(
"tasks.jobs.attack_paths.scan.db_utils.finish_attack_paths_scan"
) as mock_finish,
patch(
"tasks.jobs.attack_paths.scan.graph_database.drop_database",
side_effect=ConnectionError("neo4j down"),
),
patch(
"tasks.jobs.attack_paths.scan.get_cartography_ingestion_function",
return_value=ingestion_fn,
),
patch(
"tasks.jobs.attack_paths.scan.utils.call_within_event_loop",
side_effect=lambda fn, *a, **kw: fn(*a, **kw),
),
patch(
"tasks.jobs.attack_paths.scan.utils.stringify_exception",
return_value="Cartography failed: ingestion boom",
),
):
with pytest.raises(RuntimeError, match="ingestion boom"):
attack_paths_run(str(tenant.id), str(scan.id), "task-789")
failure_args = mock_finish.call_args[0]
assert failure_args[0] is attack_paths_scan
assert failure_args[1] == StateChoices.FAILED
assert failure_args[2] == {"global_error": "Cartography failed: ingestion boom"}
def test_run_returns_early_for_unsupported_provider(self, tenants_fixture):
tenant = tenants_fixture[0]
@@ -291,6 +373,142 @@ class TestAttackPathsRun:
mock_retrieve.assert_called_once_with(str(tenant.id), str(scan.id))
@pytest.mark.django_db
class TestFailAttackPathsScan:
def test_marks_executing_scan_as_failed(
self, tenants_fixture, providers_fixture, scans_fixture
):
from tasks.jobs.attack_paths.db_utils import (
fail_attack_paths_scan,
)
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
scan = scans_fixture[0]
scan.provider = provider
scan.save()
attack_paths_scan = AttackPathsScan.objects.create(
tenant_id=tenant.id,
provider=provider,
scan=scan,
state=StateChoices.EXECUTING,
)
with (
patch(
"tasks.jobs.attack_paths.db_utils.retrieve_attack_paths_scan",
return_value=attack_paths_scan,
) as mock_retrieve,
patch(
"tasks.jobs.attack_paths.db_utils.finish_attack_paths_scan"
) as mock_finish,
):
fail_attack_paths_scan(str(tenant.id), str(scan.id), "setup exploded")
mock_retrieve.assert_called_once_with(str(tenant.id), str(scan.id))
mock_finish.assert_called_once_with(
attack_paths_scan,
StateChoices.FAILED,
{"global_error": "setup exploded"},
)
def test_skips_already_failed_scan(
self, tenants_fixture, providers_fixture, scans_fixture
):
from tasks.jobs.attack_paths.db_utils import (
fail_attack_paths_scan,
)
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
scan = scans_fixture[0]
scan.provider = provider
scan.save()
attack_paths_scan = AttackPathsScan.objects.create(
tenant_id=tenant.id,
provider=provider,
scan=scan,
state=StateChoices.FAILED,
)
with (
patch(
"tasks.jobs.attack_paths.db_utils.retrieve_attack_paths_scan",
return_value=attack_paths_scan,
),
patch(
"tasks.jobs.attack_paths.db_utils.finish_attack_paths_scan"
) as mock_finish,
):
fail_attack_paths_scan(str(tenant.id), str(scan.id), "setup exploded")
mock_finish.assert_not_called()
def test_skips_when_no_scan_found(self, tenants_fixture):
from tasks.jobs.attack_paths.db_utils import (
fail_attack_paths_scan,
)
tenant = tenants_fixture[0]
with (
patch(
"tasks.jobs.attack_paths.db_utils.retrieve_attack_paths_scan",
return_value=None,
),
patch(
"tasks.jobs.attack_paths.db_utils.finish_attack_paths_scan"
) as mock_finish,
):
fail_attack_paths_scan(str(tenant.id), "nonexistent", "setup exploded")
mock_finish.assert_not_called()
class TestAttackPathsScanRLSTaskOnFailure:
def test_on_failure_delegates_to_fail_attack_paths_scan(self):
from tasks.tasks import AttackPathsScanRLSTask
task = AttackPathsScanRLSTask()
with patch(
"tasks.tasks.attack_paths_db_utils.fail_attack_paths_scan"
) as mock_fail:
task.on_failure(
exc=RuntimeError("boom"),
task_id="task-abc",
args=(),
kwargs={"tenant_id": "t-1", "scan_id": "s-1"},
_einfo=None,
)
mock_fail.assert_called_once_with("t-1", "s-1", "boom")
def test_on_failure_skips_when_missing_kwargs(self):
from tasks.tasks import AttackPathsScanRLSTask
task = AttackPathsScanRLSTask()
with patch(
"tasks.tasks.attack_paths_db_utils.fail_attack_paths_scan"
) as mock_fail:
task.on_failure(
exc=RuntimeError("boom"),
task_id="task-abc",
args=(),
kwargs={},
_einfo=None,
)
mock_fail.assert_not_called()
@pytest.mark.django_db
class TestAttackPathsFindingsHelpers:
def test_create_findings_indexes_executes_all_statements(self):