diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 0dcfbd94d6..d1db284e33 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -10,6 +10,10 @@ All notable changes to the **Prowler API** are documented in this file. - Attack Paths: Scan task now checks the ingest Neo4j database and configured graph sink before starting graph ingestion [(#11743)](https://github.com/prowler-cloud/prowler/pull/11743) - Disable PowerShell telemetry in the API container image [(#11746)](https://github.com/prowler-cloud/prowler/pull/11746) +### 🐞 Fixed + +- Attack Paths: Provider graph cleanup now deletes Neo4j and Neptune relationships in directed batches before deleting nodes [(#11755)](https://github.com/prowler-cloud/prowler/pull/11755) + --- ## [1.32.2] (Prowler UNRELEASED) diff --git a/api/src/backend/api/attack_paths/sink/drop.py b/api/src/backend/api/attack_paths/sink/drop.py new file mode 100644 index 0000000000..9b4044a8f0 --- /dev/null +++ b/api/src/backend/api/attack_paths/sink/drop.py @@ -0,0 +1,78 @@ +"""Shared batched deletion helpers for sink backends.""" + +import logging +import time +from typing import Any + +RELATIONSHIP_DELETE_QUERY_TEMPLATES = { + "outgoing relationship": """ + MATCH (n:`{provider_label}`)-[r]->() + WITH r LIMIT $batch_size + DELETE r + RETURN COUNT(r) AS deleted_rels_count + """, + "incoming relationship": """ + MATCH (n:`{provider_label}`)<-[r]-() + WITH r LIMIT $batch_size + DELETE r + RETURN COUNT(r) AS deleted_rels_count + """, +} + +NODE_DELETE_QUERY_TEMPLATE = """ + MATCH (n:{provider_resource_label}:`{provider_label}`) + WITH n LIMIT $batch_size + DELETE n + RETURN COUNT(n) AS deleted_nodes_count + """ + + +def delete_batches( + *, + session: Any, + logger: logging.Logger, + log_target: str, + provider_id: str, + query: str, + phase: str, + count_key: str, + total_key: str, + deleted_key: str, + initial_total: int, + batch_size: int, + drop_t0: float, +) -> tuple[int, int]: + deleted_total = initial_total + batches = 0 + while True: + logger.info( + "Deleting %s batch from %s " + "(provider=%s, batch=%s, total_%s=%s, elapsed=%.3fs)", + phase, + log_target, + provider_id, + batches + 1, + total_key, + deleted_total, + time.perf_counter() - drop_t0, + ) + record = session.run(query, {"batch_size": batch_size}).single() + deleted = (record[count_key] if record else 0) or 0 + if deleted == 0: + return deleted_total, batches + + batches += 1 + deleted_total += deleted + logger.info( + "Deleted %s batch from %s " + "(provider=%s, batch=%s, %s=%s, total_%s=%s, elapsed=%.3fs)", + phase, + log_target, + provider_id, + batches, + deleted_key, + deleted, + total_key, + deleted_total, + time.perf_counter() - drop_t0, + ) diff --git a/api/src/backend/api/attack_paths/sink/neo4j.py b/api/src/backend/api/attack_paths/sink/neo4j.py index f8446afab3..c248237f01 100644 --- a/api/src/backend/api/attack_paths/sink/neo4j.py +++ b/api/src/backend/api/attack_paths/sink/neo4j.py @@ -17,6 +17,11 @@ import neo4j import neo4j.exceptions from api.attack_paths.retryable_session import RetryableSession from api.attack_paths.sink.base import SinkDatabase +from api.attack_paths.sink.drop import ( + NODE_DELETE_QUERY_TEMPLATE, + RELATIONSHIP_DELETE_QUERY_TEMPLATES, + delete_batches, +) from config.env import env from django.conf import settings @@ -204,10 +209,8 @@ class Neo4jSink(SinkDatabase): ) provider_label = get_provider_label(provider_id) - deleted_nodes = 0 - deleted_relationships = 0 - relationship_batches = 0 - node_batches = 0 + deleted_nodes = deleted_relationships = 0 + relationship_batches = node_batches = 0 drop_t0 = time.perf_counter() logger.info( @@ -232,84 +235,44 @@ class Neo4jSink(SinkDatabase): database, provider_id, ) - # Phase 1: delete relationships incident to provider nodes in - # batches. The undirected pattern matches an edge between two - # provider nodes from both ends, so `DISTINCT r` dedupes it to - # delete a full batch of unique relationships each round. - deleted_count = 1 - while deleted_count > 0: - next_batch = relationship_batches + 1 - logger.info( - "Deleting relationship batch from Neo4j sink database %s " - "(provider=%s, batch=%s, total_rels=%s, elapsed=%.3fs)", - database, - provider_id, - next_batch, - deleted_relationships, - time.perf_counter() - drop_t0, + log_target = f"Neo4j sink database {database}" + for ( + phase, + query_template, + ) in RELATIONSHIP_DELETE_QUERY_TEMPLATES.items(): + deleted_relationships, phase_batches = delete_batches( + session=session, + logger=logger, + log_target=log_target, + provider_id=provider_id, + query=query_template.format(provider_label=provider_label), + phase=phase, + count_key="deleted_rels_count", + total_key="rels", + deleted_key="deleted_rels", + initial_total=deleted_relationships, + batch_size=BATCH_SIZE, + drop_t0=drop_t0, ) - result = session.run( - f""" - MATCH (:`{provider_label}`)-[r]-() - WITH DISTINCT r LIMIT $batch_size - DELETE r - RETURN COUNT(r) AS deleted_rels_count - """, - {"batch_size": BATCH_SIZE}, - ) - deleted_count = result.single().get("deleted_rels_count", 0) - if deleted_count > 0: - relationship_batches += 1 - deleted_relationships += deleted_count - logger.info( - "Deleted relationship batch from Neo4j sink database %s " - "(provider=%s, batch=%s, deleted_rels=%s, " - "total_rels=%s, elapsed=%.3fs)", - database, - provider_id, - relationship_batches, - deleted_count, - deleted_relationships, - time.perf_counter() - drop_t0, - ) + relationship_batches += phase_batches - # Phase 2: delete the now relationship-free nodes in batches. - deleted_count = 1 - while deleted_count > 0: - next_batch = node_batches + 1 - logger.info( - "Deleting node batch from Neo4j sink database %s " - "(provider=%s, batch=%s, total_nodes=%s, elapsed=%.3fs)", - database, - provider_id, - next_batch, - deleted_nodes, - time.perf_counter() - drop_t0, - ) - result = session.run( - f""" - MATCH (n:{PROVIDER_RESOURCE_LABEL}:`{provider_label}`) - WITH n LIMIT $batch_size - DELETE n - RETURN COUNT(n) AS deleted_nodes_count - """, - {"batch_size": BATCH_SIZE}, - ) - deleted_count = result.single().get("deleted_nodes_count", 0) - if deleted_count > 0: - node_batches += 1 - deleted_nodes += deleted_count - logger.info( - "Deleted node batch from Neo4j sink database %s " - "(provider=%s, batch=%s, deleted_nodes=%s, " - "total_nodes=%s, elapsed=%.3fs)", - database, - provider_id, - node_batches, - deleted_count, - deleted_nodes, - time.perf_counter() - drop_t0, - ) + deleted_nodes, node_batches = delete_batches( + session=session, + logger=logger, + log_target=log_target, + provider_id=provider_id, + query=NODE_DELETE_QUERY_TEMPLATE.format( + provider_label=provider_label, + provider_resource_label=PROVIDER_RESOURCE_LABEL, + ), + phase="node", + count_key="deleted_nodes_count", + total_key="nodes", + deleted_key="deleted_nodes", + initial_total=0, + batch_size=BATCH_SIZE, + drop_t0=drop_t0, + ) except GraphDatabaseQueryException as exc: if exc.code == DATABASE_NOT_FOUND_CODE: diff --git a/api/src/backend/api/attack_paths/sink/neptune.py b/api/src/backend/api/attack_paths/sink/neptune.py index ad20d080b8..b0d12069a3 100644 --- a/api/src/backend/api/attack_paths/sink/neptune.py +++ b/api/src/backend/api/attack_paths/sink/neptune.py @@ -27,6 +27,11 @@ import neo4j import neo4j.exceptions from api.attack_paths.retryable_session import RetryableSession from api.attack_paths.sink.base import SinkDatabase +from api.attack_paths.sink.drop import ( + NODE_DELETE_QUERY_TEMPLATE, + RELATIONSHIP_DELETE_QUERY_TEMPLATES, + delete_batches, +) from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.session import Session as BotoSession @@ -296,78 +301,40 @@ class NeptuneSink(SinkDatabase): "Opened Neptune writer session for provider graph drop (provider=%s)", provider_id, ) - while True: - next_batch = relationship_batches + 1 - logger.info( - "Deleting relationship batch from Neptune sink " - "(provider=%s, batch=%s, total_rels=%s, elapsed=%.3fs)", - provider_id, - next_batch, - deleted_relationships, - time.perf_counter() - drop_t0, - ) - result = session.run( - f""" - MATCH (:`{provider_label}`)-[r]-() - WITH DISTINCT r LIMIT $batch_size - DELETE r - RETURN COUNT(r) AS deleted_rels_count - """, - {"batch_size": BATCH_SIZE}, - ) - record = result.single() - deleted_rels = (record["deleted_rels_count"] if record else 0) or 0 - if deleted_rels == 0: - break - relationship_batches += 1 - deleted_relationships += deleted_rels - logger.info( - "Deleted relationship batch from Neptune sink " - "(provider=%s, batch=%s, deleted_rels=%s, total_rels=%s, " - "elapsed=%.3fs)", - provider_id, - relationship_batches, - deleted_rels, - deleted_relationships, - time.perf_counter() - drop_t0, + for phase, query_template in RELATIONSHIP_DELETE_QUERY_TEMPLATES.items(): + deleted_relationships, phase_batches = delete_batches( + session=session, + logger=logger, + log_target="Neptune sink", + provider_id=provider_id, + query=query_template.format(provider_label=provider_label), + phase=phase, + count_key="deleted_rels_count", + total_key="rels", + deleted_key="deleted_rels", + initial_total=deleted_relationships, + batch_size=BATCH_SIZE, + drop_t0=drop_t0, ) + relationship_batches += phase_batches - deleted_nodes = 0 - while True: - next_batch = node_batches + 1 - logger.info( - "Deleting node batch from Neptune sink " - "(provider=%s, batch=%s, total_nodes=%s, elapsed=%.3fs)", - provider_id, - next_batch, - deleted_nodes, - time.perf_counter() - drop_t0, - ) - result = session.run( - f""" - MATCH (n:`{PROVIDER_RESOURCE_LABEL}`:`{provider_label}`) - WITH n LIMIT $batch_size - DELETE n - RETURN COUNT(n) AS deleted_nodes_count - """, - {"batch_size": BATCH_SIZE}, - ) - record = result.single() - deleted = (record["deleted_nodes_count"] if record else 0) or 0 - if deleted == 0: - break - node_batches += 1 - deleted_nodes += deleted - logger.info( - "Deleted node batch from Neptune sink " - "(provider=%s, batch=%s, deleted_nodes=%s, total_nodes=%s, " - "elapsed=%.3fs)", - provider_id, - node_batches, - deleted, - deleted_nodes, - time.perf_counter() - drop_t0, - ) + deleted_nodes, node_batches = delete_batches( + session=session, + logger=logger, + log_target="Neptune sink", + provider_id=provider_id, + query=NODE_DELETE_QUERY_TEMPLATE.format( + provider_label=provider_label, + provider_resource_label=PROVIDER_RESOURCE_LABEL, + ), + phase="node", + count_key="deleted_nodes_count", + total_key="nodes", + deleted_key="deleted_nodes", + initial_total=0, + batch_size=BATCH_SIZE, + drop_t0=drop_t0, + ) logger.info( "Finished dropping provider graph from Neptune sink " diff --git a/api/src/backend/api/tests/test_sink.py b/api/src/backend/api/tests/test_sink.py index 64c69cbed1..4bb302d492 100644 --- a/api/src/backend/api/tests/test_sink.py +++ b/api/src/backend/api/tests/test_sink.py @@ -186,6 +186,25 @@ def _session_ctx(session: MagicMock) -> MagicMock: return ctx +def _count_result(key: str, count: int) -> MagicMock: + return MagicMock(single=MagicMock(return_value={key: count})) + + +def _directed_drop_results( + outgoing_rels: int, + incoming_rels: int, + nodes: int, +) -> list[MagicMock]: + return [ + _count_result("deleted_rels_count", outgoing_rels), + _count_result("deleted_rels_count", 0), + _count_result("deleted_rels_count", incoming_rels), + _count_result("deleted_rels_count", 0), + _count_result("deleted_nodes_count", nodes), + _count_result("deleted_nodes_count", 0), + ] + + class TestNeo4jSinkSyncWrites: def test_ensure_sync_indexes_runs_create_index_idempotent(self): from api.attack_paths.sink.neo4j import Neo4jSink @@ -310,65 +329,48 @@ class TestNeptuneSinkSyncWrites: class TestNeptuneSinkDropSubgraph: - def test_drop_subgraph_deletes_rels_before_nodes_in_bounded_batches(self): + def test_drop_subgraph_deletes_directed_rels_before_nodes_in_bounded_batches(self): from api.attack_paths.sink.neptune import NeptuneSink sink = NeptuneSink() session = MagicMock() - - rel_record_first = MagicMock() - rel_record_first.__getitem__ = lambda _self, key: 50 - rel_record_drain = MagicMock() - rel_record_drain.__getitem__ = lambda _self, key: 0 - node_record_first = MagicMock() - node_record_first.__getitem__ = lambda _self, key: 10 - node_record_drain = MagicMock() - node_record_drain.__getitem__ = lambda _self, key: 0 - - run_results = [ - MagicMock(single=MagicMock(return_value=rel_record_first)), - MagicMock(single=MagicMock(return_value=rel_record_drain)), - MagicMock(single=MagicMock(return_value=node_record_first)), - MagicMock(single=MagicMock(return_value=node_record_drain)), - ] - session.run.side_effect = run_results + session.run.side_effect = _directed_drop_results( + outgoing_rels=50, + incoming_rels=30, + nodes=10, + ) with patch.object(sink, "get_session", return_value=_session_ctx(session)): deleted = sink.drop_subgraph("ignored", "provider-1") assert deleted == 10 - first_query = session.run.call_args_list[0].args[0] - assert "DELETE r" in first_query - assert "DETACH DELETE" not in first_query - # DISTINCT avoids double-counting relationships matched from both ends. - assert "DISTINCT r" in first_query - third_query = session.run.call_args_list[2].args[0] - assert "DELETE n" in third_query + assert session.run.call_count == 6 + queries = [call.args[0] for call in session.run.call_args_list] + + assert ")-[r]->()" in queries[0] + assert ")<-[r]-()" in queries[2] + assert "DELETE n" in queries[4] + assert all("DETACH DELETE" not in query for query in queries) + assert all("DISTINCT r" not in query for query in queries) + + first_node = next(i for i, q in enumerate(queries) if "DELETE n" in q) + last_rel = max(i for i, q in enumerate(queries) if "DELETE r" in q) + assert last_rel < first_node class TestNeo4jSinkDropSubgraph: """Neo4j drop deletes relationships then nodes in batches (no ``DETACH DELETE``).""" - def test_drop_subgraph_deletes_rels_before_nodes_in_bounded_batches(self): + def test_drop_subgraph_deletes_directed_rels_before_nodes_in_bounded_batches(self): from api.attack_paths.sink.neo4j import Neo4jSink sink = Neo4jSink() session = MagicMock() - - rel_first = MagicMock() - rel_first.get = lambda key, default=0: 50 - rel_drain = MagicMock() - rel_drain.get = lambda key, default=0: 0 - node_first = MagicMock() - node_first.get = lambda key, default=0: 10 - node_drain = MagicMock() - node_drain.get = lambda key, default=0: 0 - session.run.side_effect = [ - MagicMock(single=MagicMock(return_value=rel_first)), - MagicMock(single=MagicMock(return_value=rel_drain)), - MagicMock(single=MagicMock(return_value=node_first)), - MagicMock(single=MagicMock(return_value=node_drain)), - ] + session.run.side_effect = _directed_drop_results( + outgoing_rels=50, + incoming_rels=30, + nodes=10, + ) provider_id = "00000000-0000-0000-0000-000000000abc" with patch.object(sink, "get_session", return_value=_session_ctx(session)): @@ -376,19 +378,20 @@ class TestNeo4jSinkDropSubgraph: # Only phase-2 node counts contribute to the return value. assert deleted == 10 - assert session.run.call_count == 4 + assert session.run.call_count == 6 queries = [call.args[0] for call in session.run.call_args_list] # Regression guard: the memory blow-up was caused by DETACH DELETE. assert all("DETACH DELETE" not in query for query in queries) + assert all("DISTINCT r" not in query for query in queries) first_query = queries[0] assert "DELETE r" in first_query - # DISTINCT avoids double-counting relationships matched from both ends. - assert "DISTINCT r" in first_query + assert ")-[r]->()" in first_query assert ":`_Provider_00000000000000000000000000000abc`" in first_query - assert "DELETE n" in queries[2] + assert ")<-[r]-()" in queries[2] + assert "DELETE n" in queries[4] # Relationships must be fully drained before nodes are deleted. first_node = next(i for i, q in enumerate(queries) if "DELETE n" in q)