fix(api): preflight attack paths graph databases (#11743)

This commit is contained in:
Josema Camacho
2026-06-30 17:01:48 +02:00
committed by GitHub
parent c46cbaaa4a
commit c3ce3d2b3c
7 changed files with 172 additions and 0 deletions
+1
View File
@@ -7,6 +7,7 @@ All notable changes to the **Prowler API** are documented in this file.
### 🔄 Changed
- Attack Paths: AWS Neptune is now supported as a persistent sink database, selectable via `ATTACK_PATHS_SINK_DATABASE=neptune` (default `neo4j`), Cartography's (bumped to 0.138.1) per-scan ingest database stays on Neo4j [(#11524)](https://github.com/prowler-cloud/prowler/pull/11524)
- Attack Paths: Scan task now checks the ingest Neo4j database and configured graph sink before starting graph ingestion [(#11743)](https://github.com/prowler-cloud/prowler/pull/11743)
---
@@ -96,6 +96,11 @@ def inject_provider_label(cypher: str, provider_id: str) -> str:
node pattern.
"""
label = get_provider_label(provider_id)
return inject_label(cypher, label)
def inject_label(cypher: str, label: str) -> str:
"""Rewrite a Cypher query to append a label to every node pattern."""
# Step 1: Protect strings and comments (single pass, leftmost-first)
protected: list[str] = []
@@ -106,6 +106,31 @@ def verify_connectivity() -> None:
sink_module.get_backend().verify_connectivity()
def verify_scan_databases_available() -> None:
"""Raise if either graph database needed by an Attack Paths scan is unavailable."""
errors: list[str] = []
first_error: Exception | None = None
try:
ingest.get_driver().verify_connectivity()
except Exception as exc:
errors.append(f"ingest Neo4j: {exc}")
first_error = exc
try:
get_driver().verify_connectivity()
except Exception as exc:
errors.append(f"sink {settings.ATTACK_PATHS_SINK_DATABASE}: {exc}")
if first_error is None:
first_error = exc
if errors:
raise RuntimeError(
"Attack Paths graph database unavailable before scan start: "
+ "; ".join(errors)
) from first_error
def get_uri() -> str:
"""Return the sink URI. Retained for backwards compatibility."""
if settings.ATTACK_PATHS_SINK_DATABASE == "neptune":
@@ -10,6 +10,7 @@ hierarchy; sink-internal behavior is exercised in `test_sink.py`.
from unittest.mock import MagicMock, patch
import api.attack_paths.database as db_module
import pytest
class TestDatabaseNameHelper:
@@ -72,6 +73,71 @@ class TestExecuteReadQueryRoutes:
)
class TestScanDatabaseAvailability:
def test_verify_scan_databases_available_checks_ingest_and_sink(self):
with (
patch("api.attack_paths.database.ingest") as mock_ingest,
patch("api.attack_paths.database.get_driver") as mock_get_driver,
):
db_module.verify_scan_databases_available()
mock_ingest.get_driver.return_value.verify_connectivity.assert_called_once_with()
mock_get_driver.return_value.verify_connectivity.assert_called_once_with()
def test_verify_scan_databases_available_raises_when_ingest_is_down(self):
with (
patch("api.attack_paths.database.ingest") as mock_ingest,
patch("api.attack_paths.database.get_driver"),
):
mock_ingest.get_driver.return_value.verify_connectivity.side_effect = (
RuntimeError("ingest down")
)
with pytest.raises(RuntimeError) as exc:
db_module.verify_scan_databases_available()
assert "Attack Paths graph database unavailable before scan start" in str(
exc.value
)
assert "ingest Neo4j: ingest down" in str(exc.value)
def test_verify_scan_databases_available_raises_when_sink_is_down(self, settings):
settings.ATTACK_PATHS_SINK_DATABASE = "neptune"
with (
patch("api.attack_paths.database.ingest"),
patch("api.attack_paths.database.get_driver") as mock_get_driver,
):
mock_get_driver.return_value.verify_connectivity.side_effect = RuntimeError(
"writer down"
)
with pytest.raises(RuntimeError) as exc:
db_module.verify_scan_databases_available()
assert "sink neptune: writer down" in str(exc.value)
def test_verify_scan_databases_available_reports_both_failures(self, settings):
settings.ATTACK_PATHS_SINK_DATABASE = "neo4j"
with (
patch("api.attack_paths.database.ingest") as mock_ingest,
patch("api.attack_paths.database.get_driver") as mock_get_driver,
):
mock_ingest.get_driver.return_value.verify_connectivity.side_effect = (
RuntimeError("ingest down")
)
mock_get_driver.return_value.verify_connectivity.side_effect = RuntimeError(
"sink down"
)
with pytest.raises(RuntimeError) as exc:
db_module.verify_scan_databases_available()
assert "ingest Neo4j: ingest down" in str(exc.value)
assert "sink neo4j: sink down" in str(exc.value)
class TestSinkOperationsDelegation:
def test_has_provider_data_delegates_to_sink(self, sink_backend_stub):
sink_backend_stub.has_provider_data.return_value = True
@@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest
from api.attack_paths.cypher_sanitizer import (
inject_label,
inject_provider_label,
validate_custom_query,
)
@@ -21,6 +22,13 @@ def _inject(cypher: str) -> str:
return inject_provider_label(cypher, PROVIDER_ID)
def test_generic_inject_label_reuses_provider_injection_pipeline():
result = inject_label("MATCH (n:AWSRole)--(m) RETURN n, m", "_Tenant_test")
assert "(n:AWSRole:_Tenant_test)" in result
assert "(m:_Tenant_test)" in result
# ---------------------------------------------------------------------------
# Pass A - Labeled node patterns (all clauses)
# ---------------------------------------------------------------------------
@@ -171,6 +171,8 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]:
update_tag=tmp_cartography_config.update_tag,
)
graph_database.verify_scan_databases_available()
# Starting the Attack Paths scan
if not db_utils.starting_attack_paths_scan(
attack_paths_scan, tenant_cartography_config
@@ -34,6 +34,13 @@ SYNC_RESULT_EMPTY = {
@pytest.mark.django_db
class TestAttackPathsRun:
@pytest.fixture(autouse=True)
def mock_graph_database_preflight(self):
with patch(
"tasks.jobs.attack_paths.scan.graph_database.verify_scan_databases_available"
) as mock_preflight:
yield mock_preflight
# Patching with decorators as we got a `SyntaxError: too many statically nested blocks` error if we use context managers
@patch("tasks.jobs.attack_paths.scan.graph_database.drop_database")
@patch(
@@ -190,6 +197,64 @@ class TestAttackPathsRun:
# don't switch to the new catalog/sink before the graph is live.
mock_set_scan_migrated.assert_called_once_with(attack_paths_scan, True, "neo4j")
def test_run_preflight_failure_does_not_start_scan(
self,
mock_graph_database_preflight,
tenants_fixture,
providers_fixture,
scans_fixture,
):
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
scan = scans_fixture[0]
scan.provider = provider
scan.save()
attack_paths_scan = AttackPathsScan.objects.create(
tenant_id=tenant.id,
provider=provider,
scan=scan,
state=StateChoices.SCHEDULED,
)
mock_graph_database_preflight.side_effect = RuntimeError("graph unavailable")
with (
patch(
"tasks.jobs.attack_paths.scan.rls_transaction",
new=lambda *args, **kwargs: nullcontext(),
),
patch(
"tasks.jobs.attack_paths.scan.initialize_prowler_provider",
return_value=MagicMock(_enabled_regions=["us-east-1"]),
),
patch(
"tasks.jobs.attack_paths.scan.graph_database.get_ingest_uri",
return_value="bolt://neo4j",
),
patch(
"tasks.jobs.attack_paths.scan.db_utils.retrieve_attack_paths_scan",
return_value=attack_paths_scan,
),
patch(
"tasks.jobs.attack_paths.scan.get_cartography_ingestion_function",
return_value=MagicMock(return_value={}),
),
patch(
"tasks.jobs.attack_paths.scan.db_utils.starting_attack_paths_scan"
) as mock_starting,
patch(
"tasks.jobs.attack_paths.scan.graph_database.create_database"
) as mock_create_db,
):
with pytest.raises(RuntimeError, match="graph unavailable"):
attack_paths_run(str(tenant.id), str(scan.id), "task-123")
mock_graph_database_preflight.assert_called_once_with()
mock_starting.assert_not_called()
mock_create_db.assert_not_called()
@patch(
"tasks.jobs.attack_paths.scan.utils.stringify_exception",
return_value="Cartography failed: ingestion boom",