From c3ce3d2b3c15592a6bc1f04e1c1684b95bd88a27 Mon Sep 17 00:00:00 2001 From: Josema Camacho Date: Tue, 30 Jun 2026 17:01:48 +0200 Subject: [PATCH] fix(api): preflight attack paths graph databases (#11743) --- api/CHANGELOG.md | 1 + .../api/attack_paths/cypher_sanitizer.py | 5 ++ api/src/backend/api/attack_paths/database.py | 25 +++++++ .../api/tests/test_attack_paths_database.py | 66 +++++++++++++++++++ .../api/tests/test_cypher_sanitizer.py | 8 +++ .../backend/tasks/jobs/attack_paths/scan.py | 2 + .../tasks/tests/test_attack_paths_scan.py | 65 ++++++++++++++++++ 7 files changed, 172 insertions(+) diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index f7c6d37b28..067bb192a6 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -7,6 +7,7 @@ All notable changes to the **Prowler API** are documented in this file. ### 🔄 Changed - Attack Paths: AWS Neptune is now supported as a persistent sink database, selectable via `ATTACK_PATHS_SINK_DATABASE=neptune` (default `neo4j`), Cartography's (bumped to 0.138.1) per-scan ingest database stays on Neo4j [(#11524)](https://github.com/prowler-cloud/prowler/pull/11524) +- Attack Paths: Scan task now checks the ingest Neo4j database and configured graph sink before starting graph ingestion [(#11743)](https://github.com/prowler-cloud/prowler/pull/11743) --- diff --git a/api/src/backend/api/attack_paths/cypher_sanitizer.py b/api/src/backend/api/attack_paths/cypher_sanitizer.py index 7d7c93c680..35752b3ec9 100644 --- a/api/src/backend/api/attack_paths/cypher_sanitizer.py +++ b/api/src/backend/api/attack_paths/cypher_sanitizer.py @@ -96,6 +96,11 @@ def inject_provider_label(cypher: str, provider_id: str) -> str: node pattern. """ label = get_provider_label(provider_id) + return inject_label(cypher, label) + + +def inject_label(cypher: str, label: str) -> str: + """Rewrite a Cypher query to append a label to every node pattern.""" # Step 1: Protect strings and comments (single pass, leftmost-first) protected: list[str] = [] diff --git a/api/src/backend/api/attack_paths/database.py b/api/src/backend/api/attack_paths/database.py index 6148b42bcc..3a33b964b7 100644 --- a/api/src/backend/api/attack_paths/database.py +++ b/api/src/backend/api/attack_paths/database.py @@ -106,6 +106,31 @@ def verify_connectivity() -> None: sink_module.get_backend().verify_connectivity() +def verify_scan_databases_available() -> None: + """Raise if either graph database needed by an Attack Paths scan is unavailable.""" + errors: list[str] = [] + first_error: Exception | None = None + + try: + ingest.get_driver().verify_connectivity() + except Exception as exc: + errors.append(f"ingest Neo4j: {exc}") + first_error = exc + + try: + get_driver().verify_connectivity() + except Exception as exc: + errors.append(f"sink {settings.ATTACK_PATHS_SINK_DATABASE}: {exc}") + if first_error is None: + first_error = exc + + if errors: + raise RuntimeError( + "Attack Paths graph database unavailable before scan start: " + + "; ".join(errors) + ) from first_error + + def get_uri() -> str: """Return the sink URI. Retained for backwards compatibility.""" if settings.ATTACK_PATHS_SINK_DATABASE == "neptune": diff --git a/api/src/backend/api/tests/test_attack_paths_database.py b/api/src/backend/api/tests/test_attack_paths_database.py index 049f122335..c4aca45928 100644 --- a/api/src/backend/api/tests/test_attack_paths_database.py +++ b/api/src/backend/api/tests/test_attack_paths_database.py @@ -10,6 +10,7 @@ hierarchy; sink-internal behavior is exercised in `test_sink.py`. from unittest.mock import MagicMock, patch import api.attack_paths.database as db_module +import pytest class TestDatabaseNameHelper: @@ -72,6 +73,71 @@ class TestExecuteReadQueryRoutes: ) +class TestScanDatabaseAvailability: + def test_verify_scan_databases_available_checks_ingest_and_sink(self): + with ( + patch("api.attack_paths.database.ingest") as mock_ingest, + patch("api.attack_paths.database.get_driver") as mock_get_driver, + ): + db_module.verify_scan_databases_available() + + mock_ingest.get_driver.return_value.verify_connectivity.assert_called_once_with() + mock_get_driver.return_value.verify_connectivity.assert_called_once_with() + + def test_verify_scan_databases_available_raises_when_ingest_is_down(self): + with ( + patch("api.attack_paths.database.ingest") as mock_ingest, + patch("api.attack_paths.database.get_driver"), + ): + mock_ingest.get_driver.return_value.verify_connectivity.side_effect = ( + RuntimeError("ingest down") + ) + + with pytest.raises(RuntimeError) as exc: + db_module.verify_scan_databases_available() + + assert "Attack Paths graph database unavailable before scan start" in str( + exc.value + ) + assert "ingest Neo4j: ingest down" in str(exc.value) + + def test_verify_scan_databases_available_raises_when_sink_is_down(self, settings): + settings.ATTACK_PATHS_SINK_DATABASE = "neptune" + + with ( + patch("api.attack_paths.database.ingest"), + patch("api.attack_paths.database.get_driver") as mock_get_driver, + ): + mock_get_driver.return_value.verify_connectivity.side_effect = RuntimeError( + "writer down" + ) + + with pytest.raises(RuntimeError) as exc: + db_module.verify_scan_databases_available() + + assert "sink neptune: writer down" in str(exc.value) + + def test_verify_scan_databases_available_reports_both_failures(self, settings): + settings.ATTACK_PATHS_SINK_DATABASE = "neo4j" + + with ( + patch("api.attack_paths.database.ingest") as mock_ingest, + patch("api.attack_paths.database.get_driver") as mock_get_driver, + ): + mock_ingest.get_driver.return_value.verify_connectivity.side_effect = ( + RuntimeError("ingest down") + ) + mock_get_driver.return_value.verify_connectivity.side_effect = RuntimeError( + "sink down" + ) + + with pytest.raises(RuntimeError) as exc: + db_module.verify_scan_databases_available() + + assert "ingest Neo4j: ingest down" in str(exc.value) + assert "sink neo4j: sink down" in str(exc.value) + + class TestSinkOperationsDelegation: def test_has_provider_data_delegates_to_sink(self, sink_backend_stub): sink_backend_stub.has_provider_data.return_value = True diff --git a/api/src/backend/api/tests/test_cypher_sanitizer.py b/api/src/backend/api/tests/test_cypher_sanitizer.py index 6caca47a56..c0d4f9b7ff 100644 --- a/api/src/backend/api/tests/test_cypher_sanitizer.py +++ b/api/src/backend/api/tests/test_cypher_sanitizer.py @@ -4,6 +4,7 @@ from unittest.mock import patch import pytest from api.attack_paths.cypher_sanitizer import ( + inject_label, inject_provider_label, validate_custom_query, ) @@ -21,6 +22,13 @@ def _inject(cypher: str) -> str: return inject_provider_label(cypher, PROVIDER_ID) +def test_generic_inject_label_reuses_provider_injection_pipeline(): + result = inject_label("MATCH (n:AWSRole)--(m) RETURN n, m", "_Tenant_test") + + assert "(n:AWSRole:_Tenant_test)" in result + assert "(m:_Tenant_test)" in result + + # --------------------------------------------------------------------------- # Pass A - Labeled node patterns (all clauses) # --------------------------------------------------------------------------- diff --git a/api/src/backend/tasks/jobs/attack_paths/scan.py b/api/src/backend/tasks/jobs/attack_paths/scan.py index 13390f09fb..32337c6832 100644 --- a/api/src/backend/tasks/jobs/attack_paths/scan.py +++ b/api/src/backend/tasks/jobs/attack_paths/scan.py @@ -171,6 +171,8 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]: update_tag=tmp_cartography_config.update_tag, ) + graph_database.verify_scan_databases_available() + # Starting the Attack Paths scan if not db_utils.starting_attack_paths_scan( attack_paths_scan, tenant_cartography_config diff --git a/api/src/backend/tasks/tests/test_attack_paths_scan.py b/api/src/backend/tasks/tests/test_attack_paths_scan.py index ab77a5af67..01c50c9522 100644 --- a/api/src/backend/tasks/tests/test_attack_paths_scan.py +++ b/api/src/backend/tasks/tests/test_attack_paths_scan.py @@ -34,6 +34,13 @@ SYNC_RESULT_EMPTY = { @pytest.mark.django_db class TestAttackPathsRun: + @pytest.fixture(autouse=True) + def mock_graph_database_preflight(self): + with patch( + "tasks.jobs.attack_paths.scan.graph_database.verify_scan_databases_available" + ) as mock_preflight: + yield mock_preflight + # Patching with decorators as we got a `SyntaxError: too many statically nested blocks` error if we use context managers @patch("tasks.jobs.attack_paths.scan.graph_database.drop_database") @patch( @@ -190,6 +197,64 @@ class TestAttackPathsRun: # don't switch to the new catalog/sink before the graph is live. mock_set_scan_migrated.assert_called_once_with(attack_paths_scan, True, "neo4j") + def test_run_preflight_failure_does_not_start_scan( + self, + mock_graph_database_preflight, + tenants_fixture, + providers_fixture, + scans_fixture, + ): + tenant = tenants_fixture[0] + provider = providers_fixture[0] + provider.provider = Provider.ProviderChoices.AWS + provider.save() + scan = scans_fixture[0] + scan.provider = provider + scan.save() + + attack_paths_scan = AttackPathsScan.objects.create( + tenant_id=tenant.id, + provider=provider, + scan=scan, + state=StateChoices.SCHEDULED, + ) + mock_graph_database_preflight.side_effect = RuntimeError("graph unavailable") + + with ( + patch( + "tasks.jobs.attack_paths.scan.rls_transaction", + new=lambda *args, **kwargs: nullcontext(), + ), + patch( + "tasks.jobs.attack_paths.scan.initialize_prowler_provider", + return_value=MagicMock(_enabled_regions=["us-east-1"]), + ), + patch( + "tasks.jobs.attack_paths.scan.graph_database.get_ingest_uri", + return_value="bolt://neo4j", + ), + patch( + "tasks.jobs.attack_paths.scan.db_utils.retrieve_attack_paths_scan", + return_value=attack_paths_scan, + ), + patch( + "tasks.jobs.attack_paths.scan.get_cartography_ingestion_function", + return_value=MagicMock(return_value={}), + ), + patch( + "tasks.jobs.attack_paths.scan.db_utils.starting_attack_paths_scan" + ) as mock_starting, + patch( + "tasks.jobs.attack_paths.scan.graph_database.create_database" + ) as mock_create_db, + ): + with pytest.raises(RuntimeError, match="graph unavailable"): + attack_paths_run(str(tenant.id), str(scan.id), "task-123") + + mock_graph_database_preflight.assert_called_once_with() + mock_starting.assert_not_called() + mock_create_db.assert_not_called() + @patch( "tasks.jobs.attack_paths.scan.utils.stringify_exception", return_value="Cartography failed: ingestion boom",