feat(attack-paths): configure Neo4j for read-only queries (#10140)

This commit is contained in:
Josema Camacho
2026-02-24 10:15:22 +01:00
committed by GitHub
parent 51dbf17faa
commit e688e60fde
10 changed files with 393 additions and 47 deletions

View File

@@ -27,6 +27,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Attack Paths: Upgrade Cartography from fork 0.126.1 to upstream 0.129.0 and Neo4j driver from 5.x to 6.x [(#10110)](https://github.com/prowler-cloud/prowler/pull/10110)
- Attack Paths: Query results now filtered by provider, preventing future cross-tenant and cross-provider data leakage [(#10118)](https://github.com/prowler-cloud/prowler/pull/10118)
- Attack Paths: Add private labels and properties in Attack Paths graphs for avoiding future overlapping with Cartography's ones [(#10124)](https://github.com/prowler-cloud/prowler/pull/10124)
- Attack Paths: Query endpoint executes them in read only mode [(#10140)](https://github.com/prowler-cloud/prowler/pull/10140)
### 🐞 Fixed

View File

@@ -2,6 +2,8 @@ import atexit
import logging
import threading
from typing import Any
from contextlib import contextmanager
from typing import Iterator
from uuid import UUID
@@ -12,6 +14,7 @@ import neo4j.exceptions
from django.conf import settings
from api.attack_paths.retryable_session import RetryableSession
from config.env import env
from tasks.jobs.attack_paths.config import (
BATCH_SIZE,
DEPRECATED_PROVIDER_RESOURCE_LABEL,
@@ -21,7 +24,16 @@ from tasks.jobs.attack_paths.config import (
logging.getLogger("neo4j").setLevel(logging.ERROR)
logging.getLogger("neo4j").propagate = False
SERVICE_UNAVAILABLE_MAX_RETRIES = 3
SERVICE_UNAVAILABLE_MAX_RETRIES = env.int(
"ATTACK_PATHS_SERVICE_UNAVAILABLE_MAX_RETRIES", default=3
)
READ_QUERY_TIMEOUT_SECONDS = env.int(
"ATTACK_PATHS_READ_QUERY_TIMEOUT_SECONDS", default=30
)
READ_EXCEPTION_CODES = [
"Neo.ClientError.Statement.AccessMode",
"Neo.ClientError.Procedure.ProcedureNotFound",
]
# Module-level process-wide driver singleton
_driver: neo4j.Driver | None = None
@@ -78,17 +90,29 @@ def close_driver() -> None: # TODO: Use it
@contextmanager
def get_session(database: str | None = None) -> Iterator[RetryableSession]:
def get_session(
database: str | None = None, default_access_mode: str | None = None
) -> Iterator[RetryableSession]:
session_wrapper: RetryableSession | None = None
try:
session_wrapper = RetryableSession(
session_factory=lambda: get_driver().session(database=database),
session_factory=lambda: get_driver().session(
database=database, default_access_mode=default_access_mode
),
max_retries=SERVICE_UNAVAILABLE_MAX_RETRIES,
)
yield session_wrapper
except neo4j.exceptions.Neo4jError as exc:
if (
default_access_mode == neo4j.READ_ACCESS
and exc.code in READ_EXCEPTION_CODES
):
message = "Read query not allowed"
code = READ_EXCEPTION_CODES[0]
raise WriteQueryNotAllowedException(message=message, code=code)
message = exc.message if exc.message is not None else str(exc)
raise GraphDatabaseQueryException(message=message, code=exc.code)
@@ -97,6 +121,22 @@ def get_session(database: str | None = None) -> Iterator[RetryableSession]:
session_wrapper.close()
def execute_read_query(
database: str,
cypher: str,
parameters: dict[str, Any] | None = None,
) -> neo4j.graph.Graph:
with get_session(database, default_access_mode=neo4j.READ_ACCESS) as session:
def _run(tx: neo4j.ManagedTransaction) -> neo4j.graph.Graph:
result = tx.run(
cypher, parameters or {}, timeout=READ_QUERY_TIMEOUT_SECONDS
)
return result.graph()
return session.execute_read(_run)
def create_database(database: str) -> None:
query = "CREATE DATABASE $database IF NOT EXISTS"
parameters = {"database": database}
@@ -182,3 +222,7 @@ class GraphDatabaseQueryException(Exception):
return f"{self.code}: {self.message}"
return self.message
class WriteQueryNotAllowedException(GraphDatabaseQueryException):
pass

View File

@@ -2,7 +2,7 @@ import logging
from typing import Any, Iterable
from rest_framework.exceptions import APIException, ValidationError
from rest_framework.exceptions import APIException, PermissionDenied, ValidationError
from api.attack_paths import database as graph_database, AttackPathsQueryDefinition
from config.custom_logging import BackendLogger
@@ -87,9 +87,17 @@ def execute_attack_paths_query(
provider_id: str,
) -> dict[str, Any]:
try:
with graph_database.get_session(database_name) as session:
result = session.run(definition.cypher, parameters)
return _serialize_graph(result.graph(), provider_id)
graph = graph_database.execute_read_query(
database=database_name,
cypher=definition.cypher,
parameters=parameters,
)
return _serialize_graph(graph, provider_id)
except graph_database.WriteQueryNotAllowedException:
raise PermissionDenied(
"Attack Paths query execution failed: read-only queries are enforced"
)
except graph_database.GraphDatabaseQueryException as exc:
logger.error(f"Query failed for Attack Paths query `{definition.id}`: {exc}")

View File

@@ -1,14 +1,21 @@
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from rest_framework.exceptions import APIException, ValidationError
import neo4j
import neo4j.exceptions
from rest_framework.exceptions import APIException, PermissionDenied, ValidationError
from api.attack_paths import database as graph_database
from api.attack_paths import views_helpers
def _make_neo4j_error(message, code):
"""Build a Neo4jError with the given message and code."""
return neo4j.exceptions.Neo4jError._hydrate_neo4j(code=code, message=message)
def test_normalize_run_payload_extracts_attributes_section():
payload = {
"data": {
@@ -122,28 +129,25 @@ def test_execute_attack_paths_query_serializes_graph(
)
graph = SimpleNamespace(nodes=[node, node_2], relationships=[relationship])
run_result = MagicMock()
run_result.graph.return_value = graph
session = MagicMock()
session.run.return_value = run_result
session_ctx = MagicMock()
session_ctx.__enter__.return_value = session
session_ctx.__exit__.return_value = False
graph_result = MagicMock()
graph_result.nodes = graph.nodes
graph_result.relationships = graph.relationships
database_name = "db-tenant-test-tenant-id"
with patch(
"api.attack_paths.views_helpers.graph_database.get_session",
return_value=session_ctx,
) as mock_get_session:
"api.attack_paths.views_helpers.graph_database.execute_read_query",
return_value=graph_result,
) as mock_execute_read_query:
result = views_helpers.execute_attack_paths_query(
database_name, definition, parameters, provider_id=provider_id
)
mock_get_session.assert_called_once_with(database_name)
session.run.assert_called_once_with(definition.cypher, parameters)
mock_execute_read_query.assert_called_once_with(
database=database_name,
cypher=definition.cypher,
parameters=parameters,
)
assert result["nodes"][0]["id"] == "node-1"
assert result["nodes"][0]["properties"]["complex"]["items"][0] == "value"
assert result["relationships"][0]["label"] == "OWNS"
@@ -163,17 +167,10 @@ def test_execute_attack_paths_query_wraps_graph_errors(
database_name = "db-tenant-test-tenant-id"
parameters = {"provider_uid": "123"}
class ExplodingContext:
def __enter__(self):
raise graph_database.GraphDatabaseQueryException("boom")
def __exit__(self, exc_type, exc, tb):
return False
with (
patch(
"api.attack_paths.views_helpers.graph_database.get_session",
return_value=ExplodingContext(),
"api.attack_paths.views_helpers.graph_database.execute_read_query",
side_effect=graph_database.GraphDatabaseQueryException("boom"),
),
patch("api.attack_paths.views_helpers.logger") as mock_logger,
):
@@ -185,6 +182,33 @@ def test_execute_attack_paths_query_wraps_graph_errors(
mock_logger.error.assert_called_once()
def test_execute_attack_paths_query_raises_permission_denied_on_read_only(
attack_paths_query_definition_factory,
):
definition = attack_paths_query_definition_factory(
id="aws-rds",
name="RDS",
short_description="Short desc",
description="",
cypher="MATCH (n) RETURN n",
parameters=[],
)
database_name = "db-tenant-test-tenant-id"
parameters = {"provider_uid": "123"}
with patch(
"api.attack_paths.views_helpers.graph_database.execute_read_query",
side_effect=graph_database.WriteQueryNotAllowedException(
message="Read query not allowed",
code="Neo.ClientError.Statement.AccessMode",
),
):
with pytest.raises(PermissionDenied):
views_helpers.execute_attack_paths_query(
database_name, definition, parameters, provider_id="test-provider-123"
)
def test_serialize_graph_filters_by_provider_id(attack_paths_graph_stub_classes):
provider_id = "provider-keep"
@@ -216,3 +240,105 @@ def test_serialize_graph_filters_by_provider_id(attack_paths_graph_stub_classes)
assert result["nodes"][0]["id"] == "n1"
assert len(result["relationships"]) == 1
assert result["relationships"][0]["id"] == "r1"
# -- execute_read_query read-only enforcement ---------------------------------
@pytest.fixture
def mock_neo4j_session():
"""Mock the Neo4j driver so execute_read_query uses a fake session."""
mock_session = MagicMock(spec=neo4j.Session)
mock_driver = MagicMock(spec=neo4j.Driver)
mock_driver.session.return_value = mock_session
with patch("api.attack_paths.database.get_driver", return_value=mock_driver):
yield mock_session
def test_execute_read_query_succeeds_with_select(mock_neo4j_session):
mock_graph = MagicMock(spec=neo4j.graph.Graph)
mock_neo4j_session.execute_read.return_value = mock_graph
result = graph_database.execute_read_query(
database="test-db",
cypher="MATCH (n:AWSAccount) RETURN n LIMIT 10",
)
assert result is mock_graph
def test_execute_read_query_rejects_create(mock_neo4j_session):
mock_neo4j_session.execute_read.side_effect = _make_neo4j_error(
"Writing in read access mode not allowed",
"Neo.ClientError.Statement.AccessMode",
)
with pytest.raises(graph_database.WriteQueryNotAllowedException):
graph_database.execute_read_query(
database="test-db",
cypher="CREATE (n:Node {name: 'test'}) RETURN n",
)
def test_execute_read_query_rejects_update(mock_neo4j_session):
mock_neo4j_session.execute_read.side_effect = _make_neo4j_error(
"Writing in read access mode not allowed",
"Neo.ClientError.Statement.AccessMode",
)
with pytest.raises(graph_database.WriteQueryNotAllowedException):
graph_database.execute_read_query(
database="test-db",
cypher="MATCH (n:Node) SET n.name = 'updated' RETURN n",
)
def test_execute_read_query_rejects_delete(mock_neo4j_session):
mock_neo4j_session.execute_read.side_effect = _make_neo4j_error(
"Writing in read access mode not allowed",
"Neo.ClientError.Statement.AccessMode",
)
with pytest.raises(graph_database.WriteQueryNotAllowedException):
graph_database.execute_read_query(
database="test-db",
cypher="MATCH (n:Node) DELETE n",
)
@pytest.mark.parametrize(
"cypher",
[
"CALL apoc.create.vNode(['Label'], {name: 'test'}) YIELD node RETURN node",
"MATCH (a)-[r]->(b) CALL apoc.create.vRelationship(a, 'REL', {}, b) YIELD rel RETURN rel",
],
ids=["apoc.create.vNode", "apoc.create.vRelationship"],
)
def test_execute_read_query_succeeds_with_apoc_virtual_create(
mock_neo4j_session, cypher
):
mock_graph = MagicMock(spec=neo4j.graph.Graph)
mock_neo4j_session.execute_read.return_value = mock_graph
result = graph_database.execute_read_query(database="test-db", cypher=cypher)
assert result is mock_graph
@pytest.mark.parametrize(
"cypher",
[
"CALL apoc.create.node(['Label'], {name: 'test'}) YIELD node RETURN node",
"MATCH (a), (b) CALL apoc.create.relationship(a, 'REL', {}, b) YIELD rel RETURN rel",
],
ids=["apoc.create.Node", "apoc.create.Relationship"],
)
def test_execute_read_query_rejects_apoc_real_create(mock_neo4j_session, cypher):
mock_neo4j_session.execute_read.side_effect = _make_neo4j_error(
"There is no procedure with the name `apoc.create.node` registered",
"Neo.ClientError.Procedure.ProcedureNotFound",
)
with pytest.raises(graph_database.WriteQueryNotAllowedException):
graph_database.execute_read_query(database="test-db", cypher=cypher)

View File

@@ -9,6 +9,7 @@ remain lazy. These tests validate the database module behavior itself.
import threading
from unittest.mock import MagicMock, patch
import neo4j
import pytest
@@ -241,6 +242,146 @@ class TestCloseDriver:
assert db_module._driver is None
class TestExecuteReadQuery:
"""Test read query execution helper."""
def test_execute_read_query_calls_read_session_and_returns_result(self):
import api.attack_paths.database as db_module
tx = MagicMock()
expected_graph = MagicMock()
run_result = MagicMock()
run_result.graph.return_value = expected_graph
tx.run.return_value = run_result
session = MagicMock()
def execute_read_side_effect(fn):
return fn(tx)
session.execute_read.side_effect = execute_read_side_effect
session_ctx = MagicMock()
session_ctx.__enter__.return_value = session
session_ctx.__exit__.return_value = False
with patch(
"api.attack_paths.database.get_session",
return_value=session_ctx,
) as mock_get_session:
result = db_module.execute_read_query(
"db-tenant-test-tenant-id",
"MATCH (n) RETURN n",
{"provider_uid": "123"},
)
mock_get_session.assert_called_once_with(
"db-tenant-test-tenant-id",
default_access_mode=neo4j.READ_ACCESS,
)
session.execute_read.assert_called_once()
tx.run.assert_called_once_with(
"MATCH (n) RETURN n",
{"provider_uid": "123"},
timeout=db_module.READ_QUERY_TIMEOUT_SECONDS,
)
run_result.graph.assert_called_once_with()
assert result is expected_graph
def test_execute_read_query_defaults_parameters_to_empty_dict(self):
import api.attack_paths.database as db_module
tx = MagicMock()
run_result = MagicMock()
run_result.graph.return_value = MagicMock()
tx.run.return_value = run_result
session = MagicMock()
session.execute_read.side_effect = lambda fn: fn(tx)
session_ctx = MagicMock()
session_ctx.__enter__.return_value = session
session_ctx.__exit__.return_value = False
with patch(
"api.attack_paths.database.get_session",
return_value=session_ctx,
):
db_module.execute_read_query(
"db-tenant-test-tenant-id",
"MATCH (n) RETURN n",
)
tx.run.assert_called_once_with(
"MATCH (n) RETURN n",
{},
timeout=db_module.READ_QUERY_TIMEOUT_SECONDS,
)
run_result.graph.assert_called_once_with()
class TestGetSessionReadOnly:
"""Test that get_session translates Neo4j read-mode errors."""
@pytest.fixture(autouse=True)
def reset_module_state(self):
import api.attack_paths.database as db_module
original_driver = db_module._driver
db_module._driver = None
yield
db_module._driver = original_driver
@pytest.mark.parametrize(
"neo4j_code",
[
"Neo.ClientError.Statement.AccessMode",
"Neo.ClientError.Procedure.ProcedureNotFound",
],
)
def test_get_session_raises_write_query_not_allowed(self, neo4j_code):
"""Read-mode Neo4j errors should raise `WriteQueryNotAllowedException`."""
import api.attack_paths.database as db_module
mock_session = MagicMock()
neo4j_error = neo4j.exceptions.Neo4jError._hydrate_neo4j(
code=neo4j_code,
message="Write operations are not allowed",
)
mock_session.run.side_effect = neo4j_error
mock_driver = MagicMock()
mock_driver.session.return_value = mock_session
db_module._driver = mock_driver
with pytest.raises(db_module.WriteQueryNotAllowedException):
with db_module.get_session(
default_access_mode=neo4j.READ_ACCESS
) as session:
session.run("CREATE (n) RETURN n")
def test_get_session_raises_generic_exception_for_other_errors(self):
"""Non-read-mode Neo4j errors should raise GraphDatabaseQueryException."""
import api.attack_paths.database as db_module
mock_session = MagicMock()
neo4j_error = neo4j.exceptions.Neo4jError._hydrate_neo4j(
code="Neo.ClientError.Statement.SyntaxError",
message="Invalid syntax",
)
mock_session.run.side_effect = neo4j_error
mock_driver = MagicMock()
mock_driver.session.return_value = mock_session
db_module._driver = mock_driver
with pytest.raises(db_module.GraphDatabaseQueryException):
with db_module.get_session(
default_access_mode=neo4j.READ_ACCESS
) as session:
session.run("INVALID CYPHER")
class TestThreadSafety:
"""Test thread-safe initialization."""