mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
perf(api): optimize attack paths graph cleanup (#11755)
This commit is contained in:
@@ -10,6 +10,10 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
- Attack Paths: Scan task now checks the ingest Neo4j database and configured graph sink before starting graph ingestion [(#11743)](https://github.com/prowler-cloud/prowler/pull/11743)
|
||||
- Disable PowerShell telemetry in the API container image [(#11746)](https://github.com/prowler-cloud/prowler/pull/11746)
|
||||
|
||||
### 🐞 Fixed
|
||||
|
||||
- Attack Paths: Provider graph cleanup now deletes Neo4j and Neptune relationships in directed batches before deleting nodes [(#11755)](https://github.com/prowler-cloud/prowler/pull/11755)
|
||||
|
||||
---
|
||||
|
||||
## [1.32.2] (Prowler UNRELEASED)
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
"""Shared batched deletion helpers for sink backends."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
RELATIONSHIP_DELETE_QUERY_TEMPLATES = {
|
||||
"outgoing relationship": """
|
||||
MATCH (n:`{provider_label}`)-[r]->()
|
||||
WITH r LIMIT $batch_size
|
||||
DELETE r
|
||||
RETURN COUNT(r) AS deleted_rels_count
|
||||
""",
|
||||
"incoming relationship": """
|
||||
MATCH (n:`{provider_label}`)<-[r]-()
|
||||
WITH r LIMIT $batch_size
|
||||
DELETE r
|
||||
RETURN COUNT(r) AS deleted_rels_count
|
||||
""",
|
||||
}
|
||||
|
||||
NODE_DELETE_QUERY_TEMPLATE = """
|
||||
MATCH (n:{provider_resource_label}:`{provider_label}`)
|
||||
WITH n LIMIT $batch_size
|
||||
DELETE n
|
||||
RETURN COUNT(n) AS deleted_nodes_count
|
||||
"""
|
||||
|
||||
|
||||
def delete_batches(
|
||||
*,
|
||||
session: Any,
|
||||
logger: logging.Logger,
|
||||
log_target: str,
|
||||
provider_id: str,
|
||||
query: str,
|
||||
phase: str,
|
||||
count_key: str,
|
||||
total_key: str,
|
||||
deleted_key: str,
|
||||
initial_total: int,
|
||||
batch_size: int,
|
||||
drop_t0: float,
|
||||
) -> tuple[int, int]:
|
||||
deleted_total = initial_total
|
||||
batches = 0
|
||||
while True:
|
||||
logger.info(
|
||||
"Deleting %s batch from %s "
|
||||
"(provider=%s, batch=%s, total_%s=%s, elapsed=%.3fs)",
|
||||
phase,
|
||||
log_target,
|
||||
provider_id,
|
||||
batches + 1,
|
||||
total_key,
|
||||
deleted_total,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
record = session.run(query, {"batch_size": batch_size}).single()
|
||||
deleted = (record[count_key] if record else 0) or 0
|
||||
if deleted == 0:
|
||||
return deleted_total, batches
|
||||
|
||||
batches += 1
|
||||
deleted_total += deleted
|
||||
logger.info(
|
||||
"Deleted %s batch from %s "
|
||||
"(provider=%s, batch=%s, %s=%s, total_%s=%s, elapsed=%.3fs)",
|
||||
phase,
|
||||
log_target,
|
||||
provider_id,
|
||||
batches,
|
||||
deleted_key,
|
||||
deleted,
|
||||
total_key,
|
||||
deleted_total,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
@@ -17,6 +17,11 @@ import neo4j
|
||||
import neo4j.exceptions
|
||||
from api.attack_paths.retryable_session import RetryableSession
|
||||
from api.attack_paths.sink.base import SinkDatabase
|
||||
from api.attack_paths.sink.drop import (
|
||||
NODE_DELETE_QUERY_TEMPLATE,
|
||||
RELATIONSHIP_DELETE_QUERY_TEMPLATES,
|
||||
delete_batches,
|
||||
)
|
||||
from config.env import env
|
||||
from django.conf import settings
|
||||
|
||||
@@ -204,10 +209,8 @@ class Neo4jSink(SinkDatabase):
|
||||
)
|
||||
|
||||
provider_label = get_provider_label(provider_id)
|
||||
deleted_nodes = 0
|
||||
deleted_relationships = 0
|
||||
relationship_batches = 0
|
||||
node_batches = 0
|
||||
deleted_nodes = deleted_relationships = 0
|
||||
relationship_batches = node_batches = 0
|
||||
drop_t0 = time.perf_counter()
|
||||
|
||||
logger.info(
|
||||
@@ -232,84 +235,44 @@ class Neo4jSink(SinkDatabase):
|
||||
database,
|
||||
provider_id,
|
||||
)
|
||||
# Phase 1: delete relationships incident to provider nodes in
|
||||
# batches. The undirected pattern matches an edge between two
|
||||
# provider nodes from both ends, so `DISTINCT r` dedupes it to
|
||||
# delete a full batch of unique relationships each round.
|
||||
deleted_count = 1
|
||||
while deleted_count > 0:
|
||||
next_batch = relationship_batches + 1
|
||||
logger.info(
|
||||
"Deleting relationship batch from Neo4j sink database %s "
|
||||
"(provider=%s, batch=%s, total_rels=%s, elapsed=%.3fs)",
|
||||
database,
|
||||
provider_id,
|
||||
next_batch,
|
||||
deleted_relationships,
|
||||
time.perf_counter() - drop_t0,
|
||||
log_target = f"Neo4j sink database {database}"
|
||||
for (
|
||||
phase,
|
||||
query_template,
|
||||
) in RELATIONSHIP_DELETE_QUERY_TEMPLATES.items():
|
||||
deleted_relationships, phase_batches = delete_batches(
|
||||
session=session,
|
||||
logger=logger,
|
||||
log_target=log_target,
|
||||
provider_id=provider_id,
|
||||
query=query_template.format(provider_label=provider_label),
|
||||
phase=phase,
|
||||
count_key="deleted_rels_count",
|
||||
total_key="rels",
|
||||
deleted_key="deleted_rels",
|
||||
initial_total=deleted_relationships,
|
||||
batch_size=BATCH_SIZE,
|
||||
drop_t0=drop_t0,
|
||||
)
|
||||
result = session.run(
|
||||
f"""
|
||||
MATCH (:`{provider_label}`)-[r]-()
|
||||
WITH DISTINCT r LIMIT $batch_size
|
||||
DELETE r
|
||||
RETURN COUNT(r) AS deleted_rels_count
|
||||
""",
|
||||
{"batch_size": BATCH_SIZE},
|
||||
)
|
||||
deleted_count = result.single().get("deleted_rels_count", 0)
|
||||
if deleted_count > 0:
|
||||
relationship_batches += 1
|
||||
deleted_relationships += deleted_count
|
||||
logger.info(
|
||||
"Deleted relationship batch from Neo4j sink database %s "
|
||||
"(provider=%s, batch=%s, deleted_rels=%s, "
|
||||
"total_rels=%s, elapsed=%.3fs)",
|
||||
database,
|
||||
provider_id,
|
||||
relationship_batches,
|
||||
deleted_count,
|
||||
deleted_relationships,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
relationship_batches += phase_batches
|
||||
|
||||
# Phase 2: delete the now relationship-free nodes in batches.
|
||||
deleted_count = 1
|
||||
while deleted_count > 0:
|
||||
next_batch = node_batches + 1
|
||||
logger.info(
|
||||
"Deleting node batch from Neo4j sink database %s "
|
||||
"(provider=%s, batch=%s, total_nodes=%s, elapsed=%.3fs)",
|
||||
database,
|
||||
provider_id,
|
||||
next_batch,
|
||||
deleted_nodes,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
result = session.run(
|
||||
f"""
|
||||
MATCH (n:{PROVIDER_RESOURCE_LABEL}:`{provider_label}`)
|
||||
WITH n LIMIT $batch_size
|
||||
DELETE n
|
||||
RETURN COUNT(n) AS deleted_nodes_count
|
||||
""",
|
||||
{"batch_size": BATCH_SIZE},
|
||||
)
|
||||
deleted_count = result.single().get("deleted_nodes_count", 0)
|
||||
if deleted_count > 0:
|
||||
node_batches += 1
|
||||
deleted_nodes += deleted_count
|
||||
logger.info(
|
||||
"Deleted node batch from Neo4j sink database %s "
|
||||
"(provider=%s, batch=%s, deleted_nodes=%s, "
|
||||
"total_nodes=%s, elapsed=%.3fs)",
|
||||
database,
|
||||
provider_id,
|
||||
node_batches,
|
||||
deleted_count,
|
||||
deleted_nodes,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
deleted_nodes, node_batches = delete_batches(
|
||||
session=session,
|
||||
logger=logger,
|
||||
log_target=log_target,
|
||||
provider_id=provider_id,
|
||||
query=NODE_DELETE_QUERY_TEMPLATE.format(
|
||||
provider_label=provider_label,
|
||||
provider_resource_label=PROVIDER_RESOURCE_LABEL,
|
||||
),
|
||||
phase="node",
|
||||
count_key="deleted_nodes_count",
|
||||
total_key="nodes",
|
||||
deleted_key="deleted_nodes",
|
||||
initial_total=0,
|
||||
batch_size=BATCH_SIZE,
|
||||
drop_t0=drop_t0,
|
||||
)
|
||||
|
||||
except GraphDatabaseQueryException as exc:
|
||||
if exc.code == DATABASE_NOT_FOUND_CODE:
|
||||
|
||||
@@ -27,6 +27,11 @@ import neo4j
|
||||
import neo4j.exceptions
|
||||
from api.attack_paths.retryable_session import RetryableSession
|
||||
from api.attack_paths.sink.base import SinkDatabase
|
||||
from api.attack_paths.sink.drop import (
|
||||
NODE_DELETE_QUERY_TEMPLATE,
|
||||
RELATIONSHIP_DELETE_QUERY_TEMPLATES,
|
||||
delete_batches,
|
||||
)
|
||||
from botocore.auth import SigV4Auth
|
||||
from botocore.awsrequest import AWSRequest
|
||||
from botocore.session import Session as BotoSession
|
||||
@@ -296,78 +301,40 @@ class NeptuneSink(SinkDatabase):
|
||||
"Opened Neptune writer session for provider graph drop (provider=%s)",
|
||||
provider_id,
|
||||
)
|
||||
while True:
|
||||
next_batch = relationship_batches + 1
|
||||
logger.info(
|
||||
"Deleting relationship batch from Neptune sink "
|
||||
"(provider=%s, batch=%s, total_rels=%s, elapsed=%.3fs)",
|
||||
provider_id,
|
||||
next_batch,
|
||||
deleted_relationships,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
result = session.run(
|
||||
f"""
|
||||
MATCH (:`{provider_label}`)-[r]-()
|
||||
WITH DISTINCT r LIMIT $batch_size
|
||||
DELETE r
|
||||
RETURN COUNT(r) AS deleted_rels_count
|
||||
""",
|
||||
{"batch_size": BATCH_SIZE},
|
||||
)
|
||||
record = result.single()
|
||||
deleted_rels = (record["deleted_rels_count"] if record else 0) or 0
|
||||
if deleted_rels == 0:
|
||||
break
|
||||
relationship_batches += 1
|
||||
deleted_relationships += deleted_rels
|
||||
logger.info(
|
||||
"Deleted relationship batch from Neptune sink "
|
||||
"(provider=%s, batch=%s, deleted_rels=%s, total_rels=%s, "
|
||||
"elapsed=%.3fs)",
|
||||
provider_id,
|
||||
relationship_batches,
|
||||
deleted_rels,
|
||||
deleted_relationships,
|
||||
time.perf_counter() - drop_t0,
|
||||
for phase, query_template in RELATIONSHIP_DELETE_QUERY_TEMPLATES.items():
|
||||
deleted_relationships, phase_batches = delete_batches(
|
||||
session=session,
|
||||
logger=logger,
|
||||
log_target="Neptune sink",
|
||||
provider_id=provider_id,
|
||||
query=query_template.format(provider_label=provider_label),
|
||||
phase=phase,
|
||||
count_key="deleted_rels_count",
|
||||
total_key="rels",
|
||||
deleted_key="deleted_rels",
|
||||
initial_total=deleted_relationships,
|
||||
batch_size=BATCH_SIZE,
|
||||
drop_t0=drop_t0,
|
||||
)
|
||||
relationship_batches += phase_batches
|
||||
|
||||
deleted_nodes = 0
|
||||
while True:
|
||||
next_batch = node_batches + 1
|
||||
logger.info(
|
||||
"Deleting node batch from Neptune sink "
|
||||
"(provider=%s, batch=%s, total_nodes=%s, elapsed=%.3fs)",
|
||||
provider_id,
|
||||
next_batch,
|
||||
deleted_nodes,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
result = session.run(
|
||||
f"""
|
||||
MATCH (n:`{PROVIDER_RESOURCE_LABEL}`:`{provider_label}`)
|
||||
WITH n LIMIT $batch_size
|
||||
DELETE n
|
||||
RETURN COUNT(n) AS deleted_nodes_count
|
||||
""",
|
||||
{"batch_size": BATCH_SIZE},
|
||||
)
|
||||
record = result.single()
|
||||
deleted = (record["deleted_nodes_count"] if record else 0) or 0
|
||||
if deleted == 0:
|
||||
break
|
||||
node_batches += 1
|
||||
deleted_nodes += deleted
|
||||
logger.info(
|
||||
"Deleted node batch from Neptune sink "
|
||||
"(provider=%s, batch=%s, deleted_nodes=%s, total_nodes=%s, "
|
||||
"elapsed=%.3fs)",
|
||||
provider_id,
|
||||
node_batches,
|
||||
deleted,
|
||||
deleted_nodes,
|
||||
time.perf_counter() - drop_t0,
|
||||
)
|
||||
deleted_nodes, node_batches = delete_batches(
|
||||
session=session,
|
||||
logger=logger,
|
||||
log_target="Neptune sink",
|
||||
provider_id=provider_id,
|
||||
query=NODE_DELETE_QUERY_TEMPLATE.format(
|
||||
provider_label=provider_label,
|
||||
provider_resource_label=PROVIDER_RESOURCE_LABEL,
|
||||
),
|
||||
phase="node",
|
||||
count_key="deleted_nodes_count",
|
||||
total_key="nodes",
|
||||
deleted_key="deleted_nodes",
|
||||
initial_total=0,
|
||||
batch_size=BATCH_SIZE,
|
||||
drop_t0=drop_t0,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Finished dropping provider graph from Neptune sink "
|
||||
|
||||
@@ -186,6 +186,25 @@ def _session_ctx(session: MagicMock) -> MagicMock:
|
||||
return ctx
|
||||
|
||||
|
||||
def _count_result(key: str, count: int) -> MagicMock:
|
||||
return MagicMock(single=MagicMock(return_value={key: count}))
|
||||
|
||||
|
||||
def _directed_drop_results(
|
||||
outgoing_rels: int,
|
||||
incoming_rels: int,
|
||||
nodes: int,
|
||||
) -> list[MagicMock]:
|
||||
return [
|
||||
_count_result("deleted_rels_count", outgoing_rels),
|
||||
_count_result("deleted_rels_count", 0),
|
||||
_count_result("deleted_rels_count", incoming_rels),
|
||||
_count_result("deleted_rels_count", 0),
|
||||
_count_result("deleted_nodes_count", nodes),
|
||||
_count_result("deleted_nodes_count", 0),
|
||||
]
|
||||
|
||||
|
||||
class TestNeo4jSinkSyncWrites:
|
||||
def test_ensure_sync_indexes_runs_create_index_idempotent(self):
|
||||
from api.attack_paths.sink.neo4j import Neo4jSink
|
||||
@@ -310,65 +329,48 @@ class TestNeptuneSinkSyncWrites:
|
||||
|
||||
|
||||
class TestNeptuneSinkDropSubgraph:
|
||||
def test_drop_subgraph_deletes_rels_before_nodes_in_bounded_batches(self):
|
||||
def test_drop_subgraph_deletes_directed_rels_before_nodes_in_bounded_batches(self):
|
||||
from api.attack_paths.sink.neptune import NeptuneSink
|
||||
|
||||
sink = NeptuneSink()
|
||||
session = MagicMock()
|
||||
|
||||
rel_record_first = MagicMock()
|
||||
rel_record_first.__getitem__ = lambda _self, key: 50
|
||||
rel_record_drain = MagicMock()
|
||||
rel_record_drain.__getitem__ = lambda _self, key: 0
|
||||
node_record_first = MagicMock()
|
||||
node_record_first.__getitem__ = lambda _self, key: 10
|
||||
node_record_drain = MagicMock()
|
||||
node_record_drain.__getitem__ = lambda _self, key: 0
|
||||
|
||||
run_results = [
|
||||
MagicMock(single=MagicMock(return_value=rel_record_first)),
|
||||
MagicMock(single=MagicMock(return_value=rel_record_drain)),
|
||||
MagicMock(single=MagicMock(return_value=node_record_first)),
|
||||
MagicMock(single=MagicMock(return_value=node_record_drain)),
|
||||
]
|
||||
session.run.side_effect = run_results
|
||||
session.run.side_effect = _directed_drop_results(
|
||||
outgoing_rels=50,
|
||||
incoming_rels=30,
|
||||
nodes=10,
|
||||
)
|
||||
|
||||
with patch.object(sink, "get_session", return_value=_session_ctx(session)):
|
||||
deleted = sink.drop_subgraph("ignored", "provider-1")
|
||||
|
||||
assert deleted == 10
|
||||
first_query = session.run.call_args_list[0].args[0]
|
||||
assert "DELETE r" in first_query
|
||||
assert "DETACH DELETE" not in first_query
|
||||
# DISTINCT avoids double-counting relationships matched from both ends.
|
||||
assert "DISTINCT r" in first_query
|
||||
third_query = session.run.call_args_list[2].args[0]
|
||||
assert "DELETE n" in third_query
|
||||
assert session.run.call_count == 6
|
||||
queries = [call.args[0] for call in session.run.call_args_list]
|
||||
|
||||
assert ")-[r]->()" in queries[0]
|
||||
assert ")<-[r]-()" in queries[2]
|
||||
assert "DELETE n" in queries[4]
|
||||
assert all("DETACH DELETE" not in query for query in queries)
|
||||
assert all("DISTINCT r" not in query for query in queries)
|
||||
|
||||
first_node = next(i for i, q in enumerate(queries) if "DELETE n" in q)
|
||||
last_rel = max(i for i, q in enumerate(queries) if "DELETE r" in q)
|
||||
assert last_rel < first_node
|
||||
|
||||
|
||||
class TestNeo4jSinkDropSubgraph:
|
||||
"""Neo4j drop deletes relationships then nodes in batches (no ``DETACH DELETE``)."""
|
||||
|
||||
def test_drop_subgraph_deletes_rels_before_nodes_in_bounded_batches(self):
|
||||
def test_drop_subgraph_deletes_directed_rels_before_nodes_in_bounded_batches(self):
|
||||
from api.attack_paths.sink.neo4j import Neo4jSink
|
||||
|
||||
sink = Neo4jSink()
|
||||
session = MagicMock()
|
||||
|
||||
rel_first = MagicMock()
|
||||
rel_first.get = lambda key, default=0: 50
|
||||
rel_drain = MagicMock()
|
||||
rel_drain.get = lambda key, default=0: 0
|
||||
node_first = MagicMock()
|
||||
node_first.get = lambda key, default=0: 10
|
||||
node_drain = MagicMock()
|
||||
node_drain.get = lambda key, default=0: 0
|
||||
session.run.side_effect = [
|
||||
MagicMock(single=MagicMock(return_value=rel_first)),
|
||||
MagicMock(single=MagicMock(return_value=rel_drain)),
|
||||
MagicMock(single=MagicMock(return_value=node_first)),
|
||||
MagicMock(single=MagicMock(return_value=node_drain)),
|
||||
]
|
||||
session.run.side_effect = _directed_drop_results(
|
||||
outgoing_rels=50,
|
||||
incoming_rels=30,
|
||||
nodes=10,
|
||||
)
|
||||
|
||||
provider_id = "00000000-0000-0000-0000-000000000abc"
|
||||
with patch.object(sink, "get_session", return_value=_session_ctx(session)):
|
||||
@@ -376,19 +378,20 @@ class TestNeo4jSinkDropSubgraph:
|
||||
|
||||
# Only phase-2 node counts contribute to the return value.
|
||||
assert deleted == 10
|
||||
assert session.run.call_count == 4
|
||||
assert session.run.call_count == 6
|
||||
|
||||
queries = [call.args[0] for call in session.run.call_args_list]
|
||||
# Regression guard: the memory blow-up was caused by DETACH DELETE.
|
||||
assert all("DETACH DELETE" not in query for query in queries)
|
||||
assert all("DISTINCT r" not in query for query in queries)
|
||||
|
||||
first_query = queries[0]
|
||||
assert "DELETE r" in first_query
|
||||
# DISTINCT avoids double-counting relationships matched from both ends.
|
||||
assert "DISTINCT r" in first_query
|
||||
assert ")-[r]->()" in first_query
|
||||
assert ":`_Provider_00000000000000000000000000000abc`" in first_query
|
||||
|
||||
assert "DELETE n" in queries[2]
|
||||
assert ")<-[r]-()" in queries[2]
|
||||
assert "DELETE n" in queries[4]
|
||||
|
||||
# Relationships must be fully drained before nodes are deleted.
|
||||
first_node = next(i for i, q in enumerate(queries) if "DELETE n" in q)
|
||||
|
||||
Reference in New Issue
Block a user