Compare commits

...

14 Commits

Author SHA1 Message Date
Prowler Bot
c509482954 fix(scans): scheduled scans duplicates (#9883)
Co-authored-by: Víctor Fernández Poyatos <victor@prowler.com>
2026-01-26 13:36:49 +01:00
Prowler Bot
09ee0698d4 fix(attack-paths): clear Neo4j database cache after scan and queries (#9878)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-01-23 16:26:05 +01:00
Prowler Bot
fcc106abe3 fix(attack-paths): improve findings ingestion cypher query (#9875)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-01-23 13:41:19 +01:00
Prowler Bot
b38a0358c9 fix(attack-paths): Start Neo4j at startup for API only (#9873)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-01-23 11:11:38 +01:00
Prowler Bot
ca6c85a0f1 fix(neo4j): lazy load driver (#9871)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-01-23 06:53:42 +01:00
Prowler Bot
1340b56c21 fix(attack-paths): Use Findings.all_objects to avoid the custom manager (#9870)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-01-23 06:30:08 +01:00
Prowler Bot
81fbb90adc fix(attack-paths): load findings in batches into Neo4j (#9866)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-01-22 18:35:02 +01:00
Prowler Bot
0b6352b32e fix: remove None databases name for removing provider Neo4j databases (#9860)
Co-authored-by: Josema Camacho <josema@prowler.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-01-22 15:31:08 +01:00
Prowler Bot
be1909f752 fix(attack-paths): read findings using replica DB and add more logs (#9863)
Co-authored-by: Josema Camacho <josema@prowler.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-01-22 15:06:44 +01:00
Prowler Bot
a80924cea0 fix: improve API startup process manage.py detection (#9859)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-01-22 13:44:22 +01:00
Prowler Bot
3a964286ea chore(api): Bump version to v1.18.1 (#9854)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-01-22 11:33:08 +01:00
Prowler Bot
cf658e53af chore(release): Bump version to v5.17.1 (#9851)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-01-22 11:32:40 +01:00
Prowler Bot
a20022ee9a docs: Update version to v5.17.0 (#9855)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-01-22 11:32:21 +01:00
Prowler Bot
6336137468 chore(api): Update prowler dependency to v5.17 for release 5.17.0 (#9847)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-01-21 15:36:07 +01:00
23 changed files with 1631 additions and 183 deletions

2
.env
View File

@@ -66,7 +66,7 @@ NEO4J_DBMS_SECURITY_PROCEDURES_ALLOWLIST=apoc.*
NEO4J_DBMS_SECURITY_PROCEDURES_UNRESTRICTED=apoc.*
NEO4J_DBMS_CONNECTOR_BOLT_LISTEN_ADDRESS=0.0.0.0:7687
# Neo4j Prowler settings
NEO4J_INSERT_BATCH_SIZE=500
ATTACK_PATHS_FINDINGS_BATCH_SIZE=1000
# Celery-Prowler task settings
TASK_RETRY_DELAY_SECONDS=0.1

View File

@@ -2,6 +2,21 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.18.1] (Prowler v5.17.1)
### Fixed
- Improve API startup process by `manage.py` argument detection [(#9856)](https://github.com/prowler-cloud/prowler/pull/9856)
- Deleting providers don't try to delete a `None` Neo4j database when an Attack Paths scan is scheduled [(#9858)](https://github.com/prowler-cloud/prowler/pull/9858)
- Use replica database for reading Findings to add them to the Attack Paths graph [(#9861)](https://github.com/prowler-cloud/prowler/pull/9861)
- Attack paths findings loading query to use streaming generator for O(batch_size) memory instead of O(total_findings) [(#9862)](https://github.com/prowler-cloud/prowler/pull/9862)
- Lazy load Neo4j driver [(#9868)](https://github.com/prowler-cloud/prowler/pull/9868)
- Use `Findings.all_objects` to avoid the `ActiveProviderPartitionedManager` [(#9869)](https://github.com/prowler-cloud/prowler/pull/9869)
- Lazy load Neo4j driver for workers only [(#9872)](https://github.com/prowler-cloud/prowler/pull/9872)
- Improve Cypher query for inserting Findings into Attack Paths scan graphs [(#9874)](https://github.com/prowler-cloud/prowler/pull/9874)
- Clear Neo4j database cache after Attack Paths scan and each API query [(#9877)](https://github.com/prowler-cloud/prowler/pull/9877)
- Deduplicated scheduled scans for long-running providers [(#9829)](https://github.com/prowler-cloud/prowler/pull/9829)
## [1.18.0] (Prowler v5.17.0)
### Added
@@ -14,12 +29,12 @@ All notable changes to the **Prowler API** are documented in this file.
- Attack Paths: `/api/v1/attack-paths-scans` for AWS providers backed by Neo4j [(#9805)](https://github.com/prowler-cloud/prowler/pull/9805)
### Security
- Django 5.1.15 (CVE-2025-64460, CVE-2025-13372), Werkzeug 3.1.4 (CVE-2025-66221), sqlparse 0.5.5 (PVE-2025-82038), fonttools 4.60.2 (CVE-2025-66034) [(#9730)](https://github.com/prowler-cloud/prowler/pull/9730)
- `safety` to `3.7.0` and `filelock` to `3.20.3` due to [Safety vulnerability 82754 (CVE-2025-68146)](https://data.safetycli.com/v/82754/97c/) [(#9816)](https://github.com/prowler-cloud/prowler/pull/9816)
- `pyasn1` to v0.6.2 to address [CVE-2026-23490](https://nvd.nist.gov/vuln/detail/CVE-2026-23490) [(#9818)](https://github.com/prowler-cloud/prowler/pull/9818)
- `django-allauth[saml]` to v65.13.0 to address [CVE-2025-65431](https://nvd.nist.gov/vuln/detail/CVE-2025-65431) [(#9575)](https://github.com/prowler-cloud/prowler/pull/9575)
---
## [1.17.1] (Prowler v5.16.1)

371
api/poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,7 +24,7 @@ dependencies = [
"drf-spectacular-jsonapi==0.5.1",
"gunicorn==23.0.0",
"lxml==5.3.2",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@v5.17",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (>=1.0.1,<2.0.0)",
"sentry-sdk[django] (>=2.20.0,<3.0.0)",
@@ -49,7 +49,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.18.0"
version = "1.18.1"
[project.scripts]
celery = "src.backend.config.settings.celery"

View File

@@ -1,4 +1,3 @@
import atexit
import logging
import os
import sys
@@ -35,34 +34,45 @@ class ApiConfig(AppConfig):
from api.compliance import load_prowler_compliance
# Generate required cryptographic keys if not present, but only if:
# `"manage.py" not in sys.argv`: If an external server (e.g., Gunicorn) is running the app
# `"manage.py" not in sys.argv[0]`: If an external server (e.g., Gunicorn) is running the app
# `os.environ.get("RUN_MAIN")`: If it's not a Django command or using `runserver`,
# only the main process will do it
if "manage.py" not in sys.argv or os.environ.get("RUN_MAIN"):
if (len(sys.argv) >= 1 and "manage.py" not in sys.argv[0]) or os.environ.get(
"RUN_MAIN"
):
self._ensure_crypto_keys()
# Commands that don't need Neo4j
SKIP_NEO4J_DJANGO_COMMANDS = [
"migrate",
"makemigrations",
"migrate",
"pgpartition",
"check",
"help",
"showmigrations",
"check_and_fix_socialaccount_sites_migration",
]
# Skip Neo4j initialization during tests, some Django commands, and Celery
if getattr(settings, "TESTING", False) or (
"manage.py" in sys.argv
and len(sys.argv) > 1
and sys.argv[1] in SKIP_NEO4J_DJANGO_COMMANDS
len(sys.argv) > 1
and (
(
"manage.py" in sys.argv[0]
and sys.argv[1] in SKIP_NEO4J_DJANGO_COMMANDS
)
or "celery" in sys.argv[0]
)
):
logger.info(
"Skipping Neo4j initialization because of the current Django command or testing"
"Skipping Neo4j initialization because tests, some Django commands or Celery"
)
else:
graph_database.init_driver()
atexit.register(graph_database.close_driver)
# Neo4j driver is initialized at API startup (see api.attack_paths.database)
# It remains lazy for Celery workers and selected Django commands
load_prowler_compliance()

View File

@@ -1,13 +1,12 @@
import atexit
import logging
import threading
from contextlib import contextmanager
from typing import Iterator
from uuid import UUID
import neo4j
import neo4j.exceptions
from django.conf import settings
from api.attack_paths.retryable_session import RetryableSession
@@ -51,6 +50,9 @@ def init_driver() -> neo4j.Driver:
)
_driver.verify_connectivity()
# Register cleanup handler (only runs once since we're inside the _driver is None block)
atexit.register(close_driver)
return _driver
@@ -123,6 +125,17 @@ def drop_subgraph(database: str, root_node_label: str, root_node_id: str) -> int
return 0 # As there are no nodes to delete, the result is empty
def clear_cache(database: str) -> None:
query = "CALL db.clearQueryCaches()"
try:
with get_session(database) as session:
session.run(query)
except GraphDatabaseQueryException as exc:
logging.warning(f"Failed to clear query cache for database `{database}`: {exc}")
# Neo4j functions related to Prowler + Cartography
DATABASE_NAME_TEMPLATE = "db-{attack_paths_scan_id}"

View File

@@ -64,8 +64,9 @@ class RetryableSession:
return method(*args, **kwargs)
except (
neo4j.exceptions.ServiceUnavailable,
BrokenPipeError,
ConnectionResetError,
neo4j.exceptions.ServiceUnavailable,
) as exc: # pragma: no cover - depends on infra
last_exc = exc
attempt += 1

View File

@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.18.0
version: 1.18.1
description: |-
Prowler API specification.

View File

@@ -1,10 +1,13 @@
import os
import sys
import types
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
import pytest
from django.conf import settings
import api
import api.apps as api_apps_module
from api.apps import (
ApiConfig,
@@ -150,3 +153,82 @@ def test_ensure_crypto_keys_skips_when_env_vars(monkeypatch, tmp_path):
# Assert: orchestrator did not trigger generation when env present
assert called["ensure"] is False
@pytest.fixture(autouse=True)
def stub_api_modules():
"""Provide dummy modules imported during ApiConfig.ready()."""
created = []
for name in ("api.schema_extensions", "api.signals"):
if name not in sys.modules:
sys.modules[name] = types.ModuleType(name)
created.append(name)
yield
for name in created:
sys.modules.pop(name, None)
def _set_argv(monkeypatch, argv):
monkeypatch.setattr(sys, "argv", argv, raising=False)
def _set_testing(monkeypatch, value):
monkeypatch.setattr(settings, "TESTING", value, raising=False)
def _make_app():
return ApiConfig("api", api)
def test_ready_initializes_driver_for_api_process(monkeypatch):
config = _make_app()
_set_argv(monkeypatch, ["gunicorn"])
_set_testing(monkeypatch, False)
with patch.object(ApiConfig, "_ensure_crypto_keys", return_value=None), patch(
"api.attack_paths.database.init_driver"
) as init_driver:
config.ready()
init_driver.assert_called_once()
def test_ready_skips_driver_for_celery(monkeypatch):
config = _make_app()
_set_argv(monkeypatch, ["celery", "-A", "api"])
_set_testing(monkeypatch, False)
with patch.object(ApiConfig, "_ensure_crypto_keys", return_value=None), patch(
"api.attack_paths.database.init_driver"
) as init_driver:
config.ready()
init_driver.assert_not_called()
def test_ready_skips_driver_for_manage_py_skip_command(monkeypatch):
config = _make_app()
_set_argv(monkeypatch, ["manage.py", "migrate"])
_set_testing(monkeypatch, False)
with patch.object(ApiConfig, "_ensure_crypto_keys", return_value=None), patch(
"api.attack_paths.database.init_driver"
) as init_driver:
config.ready()
init_driver.assert_not_called()
def test_ready_skips_driver_when_testing(monkeypatch):
config = _make_app()
_set_argv(monkeypatch, ["gunicorn"])
_set_testing(monkeypatch, True)
with patch.object(ApiConfig, "_ensure_crypto_keys", return_value=None), patch(
"api.attack_paths.database.init_driver"
) as init_driver:
config.ready()
init_driver.assert_not_called()

View File

@@ -0,0 +1,303 @@
"""
Tests for Neo4j database lazy initialization.
The Neo4j driver connects on first use by default. API processes may
eagerly initialize the driver during app startup, while Celery workers
remain lazy. These tests validate the database module behavior itself.
"""
import threading
from unittest.mock import MagicMock, patch
import pytest
class TestLazyInitialization:
"""Test that Neo4j driver is initialized lazily on first use."""
@pytest.fixture(autouse=True)
def reset_module_state(self):
"""Reset module-level singleton state before each test."""
import api.attack_paths.database as db_module
original_driver = db_module._driver
db_module._driver = None
yield
db_module._driver = original_driver
def test_driver_not_initialized_at_import(self):
"""Driver should be None after module import (no eager connection)."""
import api.attack_paths.database as db_module
assert db_module._driver is None
@patch("api.attack_paths.database.settings")
@patch("api.attack_paths.database.neo4j.GraphDatabase.driver")
def test_init_driver_creates_connection_on_first_call(
self, mock_driver_factory, mock_settings
):
"""init_driver() should create connection only when called."""
import api.attack_paths.database as db_module
mock_driver = MagicMock()
mock_driver_factory.return_value = mock_driver
mock_settings.DATABASES = {
"neo4j": {
"HOST": "localhost",
"PORT": 7687,
"USER": "neo4j",
"PASSWORD": "password",
}
}
assert db_module._driver is None
result = db_module.init_driver()
mock_driver_factory.assert_called_once()
mock_driver.verify_connectivity.assert_called_once()
assert result is mock_driver
assert db_module._driver is mock_driver
@patch("api.attack_paths.database.settings")
@patch("api.attack_paths.database.neo4j.GraphDatabase.driver")
def test_init_driver_returns_cached_driver_on_subsequent_calls(
self, mock_driver_factory, mock_settings
):
"""Subsequent calls should return cached driver without reconnecting."""
import api.attack_paths.database as db_module
mock_driver = MagicMock()
mock_driver_factory.return_value = mock_driver
mock_settings.DATABASES = {
"neo4j": {
"HOST": "localhost",
"PORT": 7687,
"USER": "neo4j",
"PASSWORD": "password",
}
}
first_result = db_module.init_driver()
second_result = db_module.init_driver()
third_result = db_module.init_driver()
# Only one connection attempt
assert mock_driver_factory.call_count == 1
assert mock_driver.verify_connectivity.call_count == 1
# All calls return same instance
assert first_result is second_result is third_result
@patch("api.attack_paths.database.settings")
@patch("api.attack_paths.database.neo4j.GraphDatabase.driver")
def test_get_driver_delegates_to_init_driver(
self, mock_driver_factory, mock_settings
):
"""get_driver() should use init_driver() for lazy initialization."""
import api.attack_paths.database as db_module
mock_driver = MagicMock()
mock_driver_factory.return_value = mock_driver
mock_settings.DATABASES = {
"neo4j": {
"HOST": "localhost",
"PORT": 7687,
"USER": "neo4j",
"PASSWORD": "password",
}
}
result = db_module.get_driver()
assert result is mock_driver
mock_driver_factory.assert_called_once()
class TestAtexitRegistration:
"""Test that atexit cleanup handler is registered correctly."""
@pytest.fixture(autouse=True)
def reset_module_state(self):
"""Reset module-level singleton state before each test."""
import api.attack_paths.database as db_module
original_driver = db_module._driver
db_module._driver = None
yield
db_module._driver = original_driver
@patch("api.attack_paths.database.settings")
@patch("api.attack_paths.database.atexit.register")
@patch("api.attack_paths.database.neo4j.GraphDatabase.driver")
def test_atexit_registered_on_first_init(
self, mock_driver_factory, mock_atexit_register, mock_settings
):
"""atexit.register should be called on first initialization."""
import api.attack_paths.database as db_module
mock_driver_factory.return_value = MagicMock()
mock_settings.DATABASES = {
"neo4j": {
"HOST": "localhost",
"PORT": 7687,
"USER": "neo4j",
"PASSWORD": "password",
}
}
db_module.init_driver()
mock_atexit_register.assert_called_once_with(db_module.close_driver)
@patch("api.attack_paths.database.settings")
@patch("api.attack_paths.database.atexit.register")
@patch("api.attack_paths.database.neo4j.GraphDatabase.driver")
def test_atexit_registered_only_once(
self, mock_driver_factory, mock_atexit_register, mock_settings
):
"""atexit.register should only be called once across multiple inits.
The double-checked locking on _driver ensures the atexit registration
block only executes once (when _driver is first created).
"""
import api.attack_paths.database as db_module
mock_driver_factory.return_value = MagicMock()
mock_settings.DATABASES = {
"neo4j": {
"HOST": "localhost",
"PORT": 7687,
"USER": "neo4j",
"PASSWORD": "password",
}
}
db_module.init_driver()
db_module.init_driver()
db_module.init_driver()
# Only registered once because subsequent calls hit the fast path
assert mock_atexit_register.call_count == 1
class TestCloseDriver:
"""Test driver cleanup functionality."""
@pytest.fixture(autouse=True)
def reset_module_state(self):
"""Reset module-level singleton state before each test."""
import api.attack_paths.database as db_module
original_driver = db_module._driver
db_module._driver = None
yield
db_module._driver = original_driver
def test_close_driver_closes_and_clears_driver(self):
"""close_driver() should close the driver and set it to None."""
import api.attack_paths.database as db_module
mock_driver = MagicMock()
db_module._driver = mock_driver
db_module.close_driver()
mock_driver.close.assert_called_once()
assert db_module._driver is None
def test_close_driver_handles_none_driver(self):
"""close_driver() should handle case where driver is None."""
import api.attack_paths.database as db_module
db_module._driver = None
# Should not raise
db_module.close_driver()
assert db_module._driver is None
def test_close_driver_clears_driver_even_on_close_error(self):
"""Driver should be cleared even if close() raises an exception."""
import api.attack_paths.database as db_module
mock_driver = MagicMock()
mock_driver.close.side_effect = Exception("Connection error")
db_module._driver = mock_driver
with pytest.raises(Exception, match="Connection error"):
db_module.close_driver()
# Driver should still be cleared
assert db_module._driver is None
class TestThreadSafety:
"""Test thread-safe initialization."""
@pytest.fixture(autouse=True)
def reset_module_state(self):
"""Reset module-level singleton state before each test."""
import api.attack_paths.database as db_module
original_driver = db_module._driver
db_module._driver = None
yield
db_module._driver = original_driver
@patch("api.attack_paths.database.settings")
@patch("api.attack_paths.database.neo4j.GraphDatabase.driver")
def test_concurrent_init_creates_single_driver(
self, mock_driver_factory, mock_settings
):
"""Multiple threads calling init_driver() should create only one driver."""
import api.attack_paths.database as db_module
mock_driver = MagicMock()
mock_driver_factory.return_value = mock_driver
mock_settings.DATABASES = {
"neo4j": {
"HOST": "localhost",
"PORT": 7687,
"USER": "neo4j",
"PASSWORD": "password",
}
}
results = []
errors = []
def call_init():
try:
result = db_module.init_driver()
results.append(result)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=call_init) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Threads raised errors: {errors}"
# Only one driver created
assert mock_driver_factory.call_count == 1
# All threads got the same driver instance
assert all(r is mock_driver for r in results)
assert len(results) == 10

View File

@@ -3867,6 +3867,7 @@ class TestAttackPathsScanViewSet:
"api.v1.views.attack_paths_views_helpers.execute_attack_paths_query",
return_value=graph_payload,
) as mock_execute,
patch("api.v1.views.graph_database.clear_cache") as mock_clear_cache,
):
response = authenticated_client.post(
reverse(
@@ -3889,6 +3890,7 @@ class TestAttackPathsScanViewSet:
query_definition,
prepared_parameters,
)
mock_clear_cache.assert_called_once_with(attack_paths_scan.graph_database)
result = response.json()["data"]
attributes = result["attributes"]
assert attributes["nodes"] == graph_payload["nodes"]
@@ -4000,6 +4002,7 @@ class TestAttackPathsScanViewSet:
"api.v1.views.attack_paths_views_helpers.execute_attack_paths_query",
return_value={"nodes": [], "relationships": []},
),
patch("api.v1.views.graph_database.clear_cache"),
):
response = authenticated_client.post(
reverse(

View File

@@ -77,6 +77,7 @@ from rest_framework_json_api.views import RelationshipView, Response
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from api.attack_paths import (
database as graph_database,
get_queries_for_provider,
get_query_by_id,
views_helpers as attack_paths_views_helpers,
@@ -381,7 +382,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.18.0"
spectacular_settings.VERSION = "1.18.1"
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)
@@ -2435,6 +2436,7 @@ class AttackPathsScanViewSet(BaseRLSViewSet):
graph = attack_paths_views_helpers.execute_attack_paths_query(
attack_paths_scan, query_definition, parameters
)
graph_database.clear_cache(attack_paths_scan.graph_database)
status_code = status.HTTP_200_OK
if not graph.get("nodes"):

View File

@@ -59,6 +59,7 @@ def start_aws_ingestion(
)
# Starting with sync functions
logger.info(f"Syncing organizations for AWS account {prowler_api_provider.uid}")
cartography_aws.organizations.sync(
neo4j_session,
{prowler_api_provider.alias: prowler_api_provider.uid},
@@ -84,13 +85,22 @@ def start_aws_ingestion(
)
if "permission_relationships" in requested_syncs:
logger.info(
f"Syncing function permission_relationships for AWS account {prowler_api_provider.uid}"
)
cartography_aws.RESOURCE_FUNCTIONS["permission_relationships"](**sync_args)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 88)
if "resourcegroupstaggingapi" in requested_syncs:
logger.info(
f"Syncing function resourcegroupstaggingapi for AWS account {prowler_api_provider.uid}"
)
cartography_aws.RESOURCE_FUNCTIONS["resourcegroupstaggingapi"](**sync_args)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 89)
logger.info(
f"Syncing ec2_iaminstanceprofile scoped analysis for AWS account {prowler_api_provider.uid}"
)
cartography_aws.run_scoped_analysis_job(
"aws_ec2_iaminstanceprofile.json",
neo4j_session,
@@ -98,6 +108,9 @@ def start_aws_ingestion(
)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 90)
logger.info(
f"Syncing lambda_ecr analysis for AWS account {prowler_api_provider.uid}"
)
cartography_aws.run_analysis_job(
"aws_lambda_ecr.json",
neo4j_session,
@@ -105,6 +118,7 @@ def start_aws_ingestion(
)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 91)
logger.info(f"Syncing metadata for AWS account {prowler_api_provider.uid}")
cartography_aws.merge_module_sync_metadata(
neo4j_session,
group_type="AWSAccount",
@@ -118,6 +132,7 @@ def start_aws_ingestion(
# Removing the added extra field
del common_job_parameters["AWS_ID"]
logger.info(f"Syncing cleanup_job for AWS account {prowler_api_provider.uid}")
cartography_aws.run_cleanup_job(
"aws_post_ingestion_principals_cleanup.json",
neo4j_session,
@@ -125,6 +140,7 @@ def start_aws_ingestion(
)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 93)
logger.info(f"Syncing analysis for AWS account {prowler_api_provider.uid}")
cartography_aws._perform_aws_analysis(
requested_syncs, neo4j_session, common_job_parameters
)

View File

@@ -1,6 +1,7 @@
from datetime import datetime, timezone
from typing import Any
from django.db.models import Q
from cartography.config import Config as CartographyConfig
from api.db_utils import rls_transaction
@@ -153,9 +154,15 @@ def get_provider_graph_database_names(tenant_id: str, provider_id: str) -> list[
Note: For accesing the `AttackPathsScan` we need to use `all_objects` manager because the provider is soft-deleted.
"""
with rls_transaction(tenant_id):
graph_databases_names_qs = ProwlerAPIAttackPathsScan.all_objects.filter(
provider_id=provider_id,
is_graph_database_deleted=False,
).values_list("graph_database", flat=True)
graph_databases_names_qs = (
ProwlerAPIAttackPathsScan.all_objects.filter(
~Q(graph_database=""),
graph_database__isnull=False,
provider_id=provider_id,
is_graph_database_deleted=False,
)
.values_list("graph_database", flat=True)
.distinct()
)
return list(graph_databases_names_qs)

View File

@@ -1,18 +1,21 @@
import neo4j
from collections import defaultdict
from typing import Generator
import neo4j
from cartography.client.core.tx import run_write_query
from cartography.config import Config as CartographyConfig
from celery.utils.log import get_task_logger
from api.db_utils import rls_transaction
from api.models import Provider, ResourceFindingMapping
from config.env import env
from prowler.config import config as ProwlerConfig
from tasks.jobs.attack_paths.providers import get_node_uid_field, get_root_node_label
from api.db_router import READ_REPLICA_ALIAS
from api.db_utils import rls_transaction
from api.models import Finding, Provider, ResourceFindingMapping
from prowler.config import config as ProwlerConfig
logger = get_task_logger(__name__)
BATCH_SIZE = env.int("NEO4J_INSERT_BATCH_SIZE", 500)
BATCH_SIZE = env.int("ATTACK_PATHS_FINDINGS_BATCH_SIZE", 1000)
INDEX_STATEMENTS = [
"CREATE INDEX prowler_finding_id IF NOT EXISTS FOR (n:ProwlerFinding) ON (n.id);",
@@ -22,12 +25,18 @@ INDEX_STATEMENTS = [
]
INSERT_STATEMENT_TEMPLATE = """
MATCH (account:__ROOT_NODE_LABEL__ {id: $provider_uid})
UNWIND $findings_data AS finding_data
MATCH (account:__ROOT_NODE_LABEL__ {id: $provider_uid})
MATCH (account)-->(resource)
WHERE resource.__NODE_UID_FIELD__ = finding_data.resource_uid
OR resource.id = finding_data.resource_uid
OPTIONAL MATCH (account)-->(resource_by_uid)
WHERE resource_by_uid.__NODE_UID_FIELD__ = finding_data.resource_uid
WITH account, finding_data, resource_by_uid
OPTIONAL MATCH (account)-->(resource_by_id)
WHERE resource_by_uid IS NULL
AND resource_by_id.id = finding_data.resource_uid
WITH account, finding_data, COALESCE(resource_by_uid, resource_by_id) AS resource
WHERE resource IS NOT NULL
MERGE (finding:ProwlerFinding {id: finding_data.id})
ON CREATE SET
@@ -83,9 +92,8 @@ def create_indexes(neo4j_session: neo4j.Session) -> None:
Code based on Cartography version 0.122.0, specifically on `cartography.intel.create_indexes.run`.
"""
logger.info("Creating indexes for Prowler node types.")
logger.info("Creating indexes for Prowler Findings node types")
for statement in INDEX_STATEMENTS:
logger.debug("Executing statement: %s", statement)
run_write_query(neo4j_session, statement)
@@ -103,58 +111,131 @@ def analysis(
def get_provider_last_scan_findings(
prowler_api_provider: Provider,
scan_id: str,
) -> list[dict[str, str]]:
with rls_transaction(prowler_api_provider.tenant_id):
resource_finding_qs = ResourceFindingMapping.objects.filter(
finding__scan_id=scan_id,
).values(
"resource__uid",
"finding__id",
"finding__uid",
"finding__inserted_at",
"finding__updated_at",
"finding__first_seen_at",
"finding__scan_id",
"finding__delta",
"finding__status",
"finding__status_extended",
"finding__severity",
"finding__check_id",
"finding__check_metadata__checktitle",
"finding__muted",
"finding__muted_reason",
)
) -> Generator[list[dict[str, str]], None, None]:
"""
Generator that yields batches of finding-resource pairs.
findings = []
for resource_finding in resource_finding_qs:
findings.append(
Two-step query approach per batch:
1. Paginate findings for scan (single table, indexed by scan_id)
2. Batch-fetch resource UIDs via mapping table (single join)
3. Merge and yield flat structure for Neo4j
Memory efficient: never holds more than BATCH_SIZE findings in memory.
"""
logger.info(
f"Starting findings fetch for scan {scan_id} (tenant {prowler_api_provider.tenant_id}) with batch size {BATCH_SIZE}"
)
iteration = 0
last_id = None
while True:
iteration += 1
with rls_transaction(prowler_api_provider.tenant_id, using=READ_REPLICA_ALIAS):
# Use all_objects to avoid the ActiveProviderManager's implicit JOIN
# through Scan -> Provider (to check is_deleted=False).
# The provider is already validated as active in this context.
qs = Finding.all_objects.filter(scan_id=scan_id).order_by("id")
if last_id is not None:
qs = qs.filter(id__gt=last_id)
findings_batch = list(
qs.values(
"id",
"uid",
"inserted_at",
"updated_at",
"first_seen_at",
"scan_id",
"delta",
"status",
"status_extended",
"severity",
"check_id",
"check_metadata__checktitle",
"muted",
"muted_reason",
)[:BATCH_SIZE]
)
logger.info(
f"Iteration #{iteration} fetched {len(findings_batch)} findings"
)
if not findings_batch:
logger.info(
f"No findings returned for iteration #{iteration}; stopping pagination"
)
break
last_id = findings_batch[-1]["id"]
enriched_batch = _enrich_and_flatten_batch(findings_batch)
# Yield outside the transaction
if enriched_batch:
yield enriched_batch
logger.info(f"Finished fetching findings for scan {scan_id}")
def _enrich_and_flatten_batch(
findings_batch: list[dict],
) -> list[dict[str, str]]:
"""
Fetch resource UIDs for a batch of findings and return flat structure.
One finding with 3 resources becomes 3 dicts (same output format as before).
Must be called within an RLS transaction context.
"""
finding_ids = [f["id"] for f in findings_batch]
# Single join: mapping -> resource
resource_mappings = ResourceFindingMapping.objects.filter(
finding_id__in=finding_ids
).values_list("finding_id", "resource__uid")
# Build finding_id -> [resource_uids] mapping
finding_resources = defaultdict(list)
for finding_id, resource_uid in resource_mappings:
finding_resources[finding_id].append(resource_uid)
# Flatten: one dict per (finding, resource) pair
results = []
for f in findings_batch:
resource_uids = finding_resources.get(f["id"], [])
if not resource_uids:
continue
for resource_uid in resource_uids:
results.append(
{
"resource_uid": str(resource_finding["resource__uid"]),
"id": str(resource_finding["finding__id"]),
"uid": resource_finding["finding__uid"],
"inserted_at": resource_finding["finding__inserted_at"],
"updated_at": resource_finding["finding__updated_at"],
"first_seen_at": resource_finding["finding__first_seen_at"],
"scan_id": str(resource_finding["finding__scan_id"]),
"delta": resource_finding["finding__delta"],
"status": resource_finding["finding__status"],
"status_extended": resource_finding["finding__status_extended"],
"severity": resource_finding["finding__severity"],
"check_id": str(resource_finding["finding__check_id"]),
"check_title": resource_finding[
"finding__check_metadata__checktitle"
],
"muted": resource_finding["finding__muted"],
"muted_reason": resource_finding["finding__muted_reason"],
"resource_uid": str(resource_uid),
"id": str(f["id"]),
"uid": f["uid"],
"inserted_at": f["inserted_at"],
"updated_at": f["updated_at"],
"first_seen_at": f["first_seen_at"],
"scan_id": str(f["scan_id"]),
"delta": f["delta"],
"status": f["status"],
"status_extended": f["status_extended"],
"severity": f["severity"],
"check_id": str(f["check_id"]),
"check_title": f["check_metadata__checktitle"],
"muted": f["muted"],
"muted_reason": f["muted_reason"],
}
)
return findings
return results
def load_findings(
neo4j_session: neo4j.Session,
findings_data: list[dict[str, str]],
findings_batches: Generator[list[dict[str, str]], None, None],
prowler_api_provider: Provider,
config: CartographyConfig,
) -> None:
@@ -172,16 +253,20 @@ def load_findings(
"prowler_version": ProwlerConfig.prowler_version,
}
total_length = len(findings_data)
for i in range(0, total_length, BATCH_SIZE):
parameters["findings_data"] = findings_data[i : i + BATCH_SIZE]
batch_num = 0
total_records = 0
for batch in findings_batches:
batch_num += 1
batch_size = len(batch)
total_records += batch_size
logger.info(
f"Loading findings batch {i // BATCH_SIZE + 1} / {(total_length + BATCH_SIZE - 1) // BATCH_SIZE}"
)
parameters["findings_data"] = batch
logger.info(f"Loading findings batch {batch_num} ({batch_size} records)")
neo4j_session.run(query, parameters)
logger.info(f"Finished loading {total_records} records in {batch_num} batches")
def cleanup_findings(
neo4j_session: neo4j.Session,

View File

@@ -117,17 +117,31 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]:
)
# Post-processing: Just keeping it to be more Cartography compliant
logger.info(
f"Syncing Cartography ontology for AWS account {prowler_api_provider.uid}"
)
cartography_ontology.run(neo4j_session, cartography_config)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 95)
logger.info(
f"Syncing Cartography analysis for AWS account {prowler_api_provider.uid}"
)
cartography_analysis.run(neo4j_session, cartography_config)
db_utils.update_attack_paths_scan_progress(attack_paths_scan, 96)
# Adding Prowler nodes and relationships
logger.info(
f"Syncing Prowler analysis for AWS account {prowler_api_provider.uid}"
)
prowler.analysis(
neo4j_session, prowler_api_provider, scan_id, cartography_config
)
logger.info(
f"Clearing Neo4j cache for database {cartography_config.neo4j_database}"
)
graph_database.clear_cache(cartography_config.neo4j_database)
logger.info(
f"Completed Cartography ({attack_paths_scan.id}) for "
f"{prowler_api_provider.provider.upper()} provider {prowler_api_provider.id}"

View File

@@ -1,25 +1,13 @@
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from celery import chain, group, shared_task
from celery.utils.log import get_task_logger
from django_celery_beat.models import PeriodicTask
from api.compliance import get_compliance_frameworks
from api.db_router import READ_REPLICA_ALIAS
from api.db_utils import rls_transaction
from api.decorators import handle_provider_deletion, set_tenant
from api.models import Finding, Integration, Provider, Scan, ScanSummary, StateChoices
from api.utils import initialize_prowler_provider
from api.v1.serializers import ScanTaskSerializer
from config.celery import RLSTask
from config.django.base import DJANGO_FINDINGS_BATCH_SIZE, DJANGO_TMP_OUTPUT_DIRECTORY
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.finding import Finding as FindingOutput
from django_celery_beat.models import PeriodicTask
from tasks.jobs.attack_paths import (
attack_paths_scan,
can_provider_run_attack_paths_scan,
@@ -64,7 +52,22 @@ from tasks.jobs.scan import (
perform_prowler_scan,
update_provider_compliance_scores,
)
from tasks.utils import batched, get_next_execution_datetime
from tasks.utils import (
_get_or_create_scheduled_scan,
batched,
get_next_execution_datetime,
)
from api.compliance import get_compliance_frameworks
from api.db_router import READ_REPLICA_ALIAS
from api.db_utils import rls_transaction
from api.decorators import handle_provider_deletion, set_tenant
from api.models import Finding, Integration, Provider, Scan, ScanSummary, StateChoices
from api.utils import initialize_prowler_provider
from api.v1.serializers import ScanTaskSerializer
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.finding import Finding as FindingOutput
logger = get_task_logger(__name__)
@@ -275,44 +278,38 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
periodic_task_instance = PeriodicTask.objects.get(
name=f"scan-perform-scheduled-{provider_id}"
)
executed_scan = Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
task__task_runner_task__task_id=task_id,
).order_by("completed_at")
if (
executing_scan = (
Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.EXECUTING,
scheduler_task_id=periodic_task_instance.id,
scheduled_at__date=datetime.now(timezone.utc).date(),
).exists()
or executed_scan.exists()
):
# Duplicated task execution due to visibility timeout or scan is already running
logger.warning(f"Duplicated scheduled scan for provider {provider_id}.")
try:
affected_scan = executed_scan.first()
if not affected_scan:
raise ValueError(
"Error retrieving affected scan details after detecting duplicated scheduled "
"scan."
)
# Return the affected scan details to avoid losing data
serializer = ScanTaskSerializer(instance=affected_scan)
except Exception as duplicated_scan_exception:
logger.error(
f"Duplicated scheduled scan for provider {provider_id}. Error retrieving affected scan details: "
f"{str(duplicated_scan_exception)}"
)
raise duplicated_scan_exception
return serializer.data
)
.order_by("-started_at")
.first()
)
if executing_scan:
logger.warning(
f"Scheduled scan already executing for provider {provider_id}. Skipping."
)
return ScanTaskSerializer(instance=executing_scan).data
executed_scan = Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
task__task_runner_task__task_id=task_id,
).first()
if executed_scan:
# Duplicated task execution due to visibility timeout
logger.warning(f"Duplicated scheduled scan for provider {provider_id}.")
return ScanTaskSerializer(instance=executed_scan).data
interval = periodic_task_instance.interval
next_scan_datetime = get_next_execution_datetime(task_id, provider_id)
current_scan_datetime = next_scan_datetime - timedelta(
**{interval.period: interval.every}
)
# TEMPORARY WORKAROUND: Clean up orphan scans from transaction isolation issue
_cleanup_orphan_scheduled_scans(
@@ -321,19 +318,12 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
scheduler_task_id=periodic_task_instance.id,
)
scan_instance, _ = Scan.objects.get_or_create(
scan_instance = _get_or_create_scheduled_scan(
tenant_id=tenant_id,
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state__in=(StateChoices.SCHEDULED, StateChoices.AVAILABLE),
scheduler_task_id=periodic_task_instance.id,
defaults={
"state": StateChoices.SCHEDULED,
"name": "Daily scheduled scan",
"scheduled_at": next_scan_datetime - timedelta(days=1),
},
scheduled_at=current_scan_datetime,
)
scan_instance.task_id = task_id
scan_instance.save()
@@ -343,18 +333,19 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
scan_id=str(scan_instance.id),
provider_id=provider_id,
)
except Exception as e:
raise e
finally:
with rls_transaction(tenant_id):
Scan.objects.get_or_create(
now = datetime.now(timezone.utc)
if next_scan_datetime <= now:
interval_delta = timedelta(**{interval.period: interval.every})
while next_scan_datetime <= now:
next_scan_datetime += interval_delta
_get_or_create_scheduled_scan(
tenant_id=tenant_id,
name="Daily scheduled scan",
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.SCHEDULED,
scheduled_at=next_scan_datetime,
scheduler_task_id=periodic_task_instance.id,
scheduled_at=next_scan_datetime,
update_state=True,
)
_perform_scan_complete_tasks(tenant_id, str(scan_instance.id), provider_id)

View File

@@ -3,6 +3,8 @@ from types import SimpleNamespace
from unittest.mock import MagicMock, call, patch
import pytest
from tasks.jobs.attack_paths import prowler as prowler_module
from tasks.jobs.attack_paths.scan import run as attack_paths_run
from api.models import (
AttackPathsScan,
@@ -15,8 +17,6 @@ from api.models import (
StatusChoices,
)
from prowler.lib.check.models import Severity
from tasks.jobs.attack_paths import prowler as prowler_module
from tasks.jobs.attack_paths.scan import run as attack_paths_run
@pytest.mark.django_db
@@ -68,6 +68,7 @@ class TestAttackPathsRun:
"tasks.jobs.attack_paths.scan.graph_database.get_session",
return_value=session_ctx,
) as mock_get_session,
patch("tasks.jobs.attack_paths.scan.graph_database.clear_cache"),
patch(
"tasks.jobs.attack_paths.scan.cartography_create_indexes.run"
) as mock_cartography_indexes,
@@ -276,15 +277,15 @@ class TestAttackPathsProwlerHelpers:
provider.provider = Provider.ProviderChoices.AWS
provider.save()
findings = [
{"id": "1", "resource_uid": "r-1"},
{"id": "2", "resource_uid": "r-2"},
]
# Create a generator that yields two batches
def findings_generator():
yield [{"id": "1", "resource_uid": "r-1"}]
yield [{"id": "2", "resource_uid": "r-2"}]
config = SimpleNamespace(update_tag=12345)
mock_session = MagicMock()
with (
patch.object(prowler_module, "BATCH_SIZE", 1),
patch(
"tasks.jobs.attack_paths.prowler.get_root_node_label",
return_value="AWSAccount",
@@ -294,7 +295,9 @@ class TestAttackPathsProwlerHelpers:
return_value="arn",
),
):
prowler_module.load_findings(mock_session, findings, provider, config)
prowler_module.load_findings(
mock_session, findings_generator(), provider, config
)
assert mock_session.run.call_count == 2
for call_args in mock_session.run.call_args_list:
@@ -402,11 +405,18 @@ class TestAttackPathsProwlerHelpers:
with patch(
"tasks.jobs.attack_paths.prowler.rls_transaction",
new=lambda *args, **kwargs: nullcontext(),
), patch(
"tasks.jobs.attack_paths.prowler.READ_REPLICA_ALIAS",
"default",
):
findings_data = prowler_module.get_provider_last_scan_findings(
# Generator yields batches, collect all findings from all batches
findings_batches = prowler_module.get_provider_last_scan_findings(
provider,
str(latest_scan.id),
)
findings_data = []
for batch in findings_batches:
findings_data.extend(batch)
assert len(findings_data) == 1
finding_dict = findings_data[0]
@@ -414,3 +424,285 @@ class TestAttackPathsProwlerHelpers:
assert finding_dict["resource_uid"] == resource.uid
assert finding_dict["check_title"] == "Check title"
assert finding_dict["scan_id"] == str(latest_scan.id)
def test_enrich_and_flatten_batch_single_resource(
self,
tenants_fixture,
providers_fixture,
):
"""One finding + one resource = one output dict"""
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
resource = Resource.objects.create(
tenant_id=tenant.id,
provider=provider,
uid="resource-uid-1",
name="Resource 1",
region="us-east-1",
service="ec2",
type="instance",
)
scan = Scan.objects.create(
name="Test Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
)
finding = Finding.objects.create(
tenant_id=tenant.id,
uid="finding-uid",
scan=scan,
delta=Finding.DeltaChoices.NEW,
status=StatusChoices.FAIL,
status_extended="failed",
severity=Severity.high,
impact=Severity.high,
impact_extended="",
raw_result={},
check_id="check-1",
check_metadata={"checktitle": "Check title"},
first_seen_at=scan.inserted_at,
)
ResourceFindingMapping.objects.create(
tenant_id=tenant.id,
resource=resource,
finding=finding,
)
# Simulate the dict returned by .values()
finding_dict = {
"id": finding.id,
"uid": finding.uid,
"inserted_at": finding.inserted_at,
"updated_at": finding.updated_at,
"first_seen_at": finding.first_seen_at,
"scan_id": scan.id,
"delta": finding.delta,
"status": finding.status,
"status_extended": finding.status_extended,
"severity": finding.severity,
"check_id": finding.check_id,
"check_metadata__checktitle": finding.check_metadata["checktitle"],
"muted": finding.muted,
"muted_reason": finding.muted_reason,
}
# _enrich_and_flatten_batch queries ResourceFindingMapping directly
# No RLS mock needed - test DB doesn't enforce RLS policies
with patch(
"tasks.jobs.attack_paths.prowler.READ_REPLICA_ALIAS",
"default",
):
result = prowler_module._enrich_and_flatten_batch([finding_dict])
assert len(result) == 1
assert result[0]["resource_uid"] == resource.uid
assert result[0]["id"] == str(finding.id)
assert result[0]["status"] == "FAIL"
def test_enrich_and_flatten_batch_multiple_resources(
self,
tenants_fixture,
providers_fixture,
):
"""One finding + three resources = three output dicts"""
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
resources = []
for i in range(3):
resource = Resource.objects.create(
tenant_id=tenant.id,
provider=provider,
uid=f"resource-uid-{i}",
name=f"Resource {i}",
region="us-east-1",
service="ec2",
type="instance",
)
resources.append(resource)
scan = Scan.objects.create(
name="Test Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
)
finding = Finding.objects.create(
tenant_id=tenant.id,
uid="finding-uid",
scan=scan,
delta=Finding.DeltaChoices.NEW,
status=StatusChoices.FAIL,
status_extended="failed",
severity=Severity.high,
impact=Severity.high,
impact_extended="",
raw_result={},
check_id="check-1",
check_metadata={"checktitle": "Check title"},
first_seen_at=scan.inserted_at,
)
# Map finding to all 3 resources
for resource in resources:
ResourceFindingMapping.objects.create(
tenant_id=tenant.id,
resource=resource,
finding=finding,
)
finding_dict = {
"id": finding.id,
"uid": finding.uid,
"inserted_at": finding.inserted_at,
"updated_at": finding.updated_at,
"first_seen_at": finding.first_seen_at,
"scan_id": scan.id,
"delta": finding.delta,
"status": finding.status,
"status_extended": finding.status_extended,
"severity": finding.severity,
"check_id": finding.check_id,
"check_metadata__checktitle": finding.check_metadata["checktitle"],
"muted": finding.muted,
"muted_reason": finding.muted_reason,
}
# _enrich_and_flatten_batch queries ResourceFindingMapping directly
# No RLS mock needed - test DB doesn't enforce RLS policies
with patch(
"tasks.jobs.attack_paths.prowler.READ_REPLICA_ALIAS",
"default",
):
result = prowler_module._enrich_and_flatten_batch([finding_dict])
assert len(result) == 3
result_resource_uids = {r["resource_uid"] for r in result}
assert result_resource_uids == {r.uid for r in resources}
# All should have same finding data
for r in result:
assert r["id"] == str(finding.id)
assert r["status"] == "FAIL"
def test_enrich_and_flatten_batch_no_resources_skips(
self,
tenants_fixture,
providers_fixture,
):
"""Finding without resources should be skipped"""
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
scan = Scan.objects.create(
name="Test Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
)
finding = Finding.objects.create(
tenant_id=tenant.id,
uid="orphan-finding",
scan=scan,
delta=Finding.DeltaChoices.NEW,
status=StatusChoices.FAIL,
status_extended="failed",
severity=Severity.high,
impact=Severity.high,
impact_extended="",
raw_result={},
check_id="check-1",
check_metadata={"checktitle": "Check title"},
first_seen_at=scan.inserted_at,
)
# Note: No ResourceFindingMapping created
finding_dict = {
"id": finding.id,
"uid": finding.uid,
"inserted_at": finding.inserted_at,
"updated_at": finding.updated_at,
"first_seen_at": finding.first_seen_at,
"scan_id": scan.id,
"delta": finding.delta,
"status": finding.status,
"status_extended": finding.status_extended,
"severity": finding.severity,
"check_id": finding.check_id,
"check_metadata__checktitle": finding.check_metadata["checktitle"],
"muted": finding.muted,
"muted_reason": finding.muted_reason,
}
# Mock logger to verify no warning is emitted
with (
patch(
"tasks.jobs.attack_paths.prowler.READ_REPLICA_ALIAS",
"default",
),
patch("tasks.jobs.attack_paths.prowler.logger") as mock_logger,
):
result = prowler_module._enrich_and_flatten_batch([finding_dict])
assert len(result) == 0
mock_logger.warning.assert_not_called()
def test_generator_is_lazy(self, providers_fixture):
"""Generator should not execute queries until iterated"""
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
scan_id = "some-scan-id"
with (
patch("tasks.jobs.attack_paths.prowler.rls_transaction") as mock_rls,
patch("tasks.jobs.attack_paths.prowler.Finding") as mock_finding,
):
# Create generator but don't iterate
prowler_module.get_provider_last_scan_findings(provider, scan_id)
# Nothing should be called yet
mock_rls.assert_not_called()
mock_finding.objects.filter.assert_not_called()
def test_load_findings_empty_generator(self, providers_fixture):
"""Empty generator should not call neo4j"""
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
mock_session = MagicMock()
config = SimpleNamespace(update_tag=12345)
def empty_gen():
return
yield # Make it a generator
with (
patch(
"tasks.jobs.attack_paths.prowler.get_root_node_label",
return_value="AWSAccount",
),
patch(
"tasks.jobs.attack_paths.prowler.get_node_uid_field",
return_value="arn",
),
):
prowler_module.load_findings(mock_session, empty_gen(), provider, config)
mock_session.run.assert_not_called()

View File

@@ -1,21 +1,13 @@
import uuid
from contextlib import contextmanager
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import openai
import pytest
from botocore.exceptions import ClientError
from django_celery_beat.models import IntervalSchedule, PeriodicTask
from api.models import (
Integration,
LighthouseProviderConfiguration,
LighthouseProviderModels,
Scan,
StateChoices,
)
from django_celery_results.models import TaskResult
from tasks.jobs.lighthouse_providers import (
_create_bedrock_client,
_extract_bedrock_credentials,
@@ -27,11 +19,21 @@ from tasks.tasks import (
check_lighthouse_provider_connection_task,
generate_outputs_task,
perform_attack_paths_scan_task,
perform_scheduled_scan_task,
refresh_lighthouse_provider_models_task,
s3_integration_task,
security_hub_integration_task,
)
from api.models import (
Integration,
LighthouseProviderConfiguration,
LighthouseProviderModels,
Scan,
StateChoices,
Task,
)
@pytest.mark.django_db
class TestExtractBedrockCredentials:
@@ -2137,3 +2139,215 @@ class TestCleanupOrphanScheduledScans:
assert not Scan.objects.filter(id=orphan_scan.id).exists()
assert Scan.objects.filter(id=scheduled_scan.id).exists()
assert Scan.objects.filter(id=available_scan_other_task.id).exists()
@pytest.mark.django_db
class TestPerformScheduledScanTask:
"""Unit tests for perform_scheduled_scan_task."""
@staticmethod
@contextmanager
def _override_task_request(task, **attrs):
request = task.request
sentinel = object()
previous = {key: getattr(request, key, sentinel) for key in attrs}
for key, value in attrs.items():
setattr(request, key, value)
try:
yield
finally:
for key, prev in previous.items():
if prev is sentinel:
if hasattr(request, key):
delattr(request, key)
else:
setattr(request, key, prev)
def _create_periodic_task(self, provider_id, tenant_id, interval_hours=24):
interval, _ = IntervalSchedule.objects.get_or_create(
every=interval_hours, period="hours"
)
return PeriodicTask.objects.create(
name=f"scan-perform-scheduled-{provider_id}",
task="scan-perform-scheduled",
interval=interval,
kwargs=f'{{"tenant_id": "{tenant_id}", "provider_id": "{provider_id}"}}',
enabled=True,
)
def _create_task_result(self, tenant_id, task_id):
task_result = TaskResult.objects.create(
task_id=task_id,
task_name="scan-perform-scheduled",
status="STARTED",
date_created=datetime.now(timezone.utc),
)
Task.objects.create(
id=task_id, task_runner_task=task_result, tenant_id=tenant_id
)
return task_result
def test_skip_when_scheduled_scan_executing(
self, tenants_fixture, providers_fixture
):
"""Skip a scheduled run when another scheduled scan is already executing."""
tenant = tenants_fixture[0]
provider = providers_fixture[0]
periodic_task = self._create_periodic_task(provider.id, tenant.id)
task_id = str(uuid.uuid4())
self._create_task_result(tenant.id, task_id)
executing_scan = Scan.objects.create(
tenant_id=tenant.id,
provider=provider,
name="Daily scheduled scan",
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.EXECUTING,
scheduler_task_id=periodic_task.id,
)
with (
patch("tasks.tasks.perform_prowler_scan") as mock_scan,
patch("tasks.tasks._perform_scan_complete_tasks") as mock_complete_tasks,
self._override_task_request(perform_scheduled_scan_task, id=task_id),
):
result = perform_scheduled_scan_task.run(
tenant_id=str(tenant.id), provider_id=str(provider.id)
)
mock_scan.assert_not_called()
mock_complete_tasks.assert_not_called()
assert result["id"] == str(executing_scan.id)
assert result["state"] == StateChoices.EXECUTING
assert (
Scan.objects.filter(
tenant_id=tenant.id,
provider=provider,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.SCHEDULED,
).count()
== 0
)
def test_creates_next_scheduled_scan_after_completion(
self, tenants_fixture, providers_fixture
):
"""Create a next scheduled scan after a successful run completes."""
tenant = tenants_fixture[0]
provider = providers_fixture[0]
self._create_periodic_task(provider.id, tenant.id)
task_id = str(uuid.uuid4())
self._create_task_result(tenant.id, task_id)
def _complete_scan(tenant_id, scan_id, provider_id):
other_scheduled = Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.SCHEDULED,
).exclude(id=scan_id)
assert not other_scheduled.exists()
scan_instance = Scan.objects.get(id=scan_id)
scan_instance.state = StateChoices.COMPLETED
scan_instance.save()
return {"status": "ok"}
with (
patch("tasks.tasks.perform_prowler_scan", side_effect=_complete_scan),
patch("tasks.tasks._perform_scan_complete_tasks"),
self._override_task_request(perform_scheduled_scan_task, id=task_id),
):
perform_scheduled_scan_task.run(
tenant_id=str(tenant.id), provider_id=str(provider.id)
)
scheduled_scans = Scan.objects.filter(
tenant_id=tenant.id,
provider=provider,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.SCHEDULED,
)
assert scheduled_scans.count() == 1
assert scheduled_scans.first().scheduled_at > datetime.now(timezone.utc)
assert (
Scan.objects.filter(
tenant_id=tenant.id,
provider=provider,
trigger=Scan.TriggerChoices.SCHEDULED,
state__in=(StateChoices.SCHEDULED, StateChoices.AVAILABLE),
).count()
== 1
)
assert (
Scan.objects.filter(
tenant_id=tenant.id,
provider=provider,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.COMPLETED,
).count()
== 1
)
def test_dedupes_multiple_scheduled_scans_before_run(
self, tenants_fixture, providers_fixture
):
"""Ensure duplicated scheduled scans are removed before executing."""
tenant = tenants_fixture[0]
provider = providers_fixture[0]
periodic_task = self._create_periodic_task(provider.id, tenant.id)
task_id = str(uuid.uuid4())
self._create_task_result(tenant.id, task_id)
scheduled_scan = Scan.objects.create(
tenant_id=tenant.id,
provider=provider,
name="Daily scheduled scan",
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.SCHEDULED,
scheduled_at=datetime.now(timezone.utc),
scheduler_task_id=periodic_task.id,
)
duplicate_scan = Scan.objects.create(
tenant_id=tenant.id,
provider=provider,
name="Daily scheduled scan",
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.AVAILABLE,
scheduled_at=scheduled_scan.scheduled_at,
scheduler_task_id=periodic_task.id,
)
def _complete_scan(tenant_id, scan_id, provider_id):
other_scheduled = Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state__in=(StateChoices.SCHEDULED, StateChoices.AVAILABLE),
).exclude(id=scan_id)
assert not other_scheduled.exists()
scan_instance = Scan.objects.get(id=scan_id)
scan_instance.state = StateChoices.COMPLETED
scan_instance.save()
return {"status": "ok"}
with (
patch("tasks.tasks.perform_prowler_scan", side_effect=_complete_scan),
patch("tasks.tasks._perform_scan_complete_tasks"),
self._override_task_request(perform_scheduled_scan_task, id=task_id),
):
perform_scheduled_scan_task.run(
tenant_id=str(tenant.id), provider_id=str(provider.id)
)
assert not Scan.objects.filter(id=duplicate_scan.id).exists()
assert Scan.objects.filter(id=scheduled_scan.id).exists()
assert (
Scan.objects.filter(
tenant_id=tenant.id,
provider=provider,
trigger=Scan.TriggerChoices.SCHEDULED,
state__in=(StateChoices.SCHEDULED, StateChoices.AVAILABLE),
).count()
== 1
)

View File

@@ -5,6 +5,10 @@ from enum import Enum
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult
from api.models import Scan, StateChoices
SCHEDULED_SCAN_NAME = "Daily scheduled scan"
class CustomEncoder(json.JSONEncoder):
def default(self, o):
@@ -71,3 +75,58 @@ def batched(iterable, batch_size):
batch = []
yield batch, True
def _get_or_create_scheduled_scan(
tenant_id: str,
provider_id: str,
scheduler_task_id: int,
scheduled_at: datetime,
update_state: bool = False,
) -> Scan:
"""
Get or create a scheduled scan, cleaning up duplicates if found.
Args:
tenant_id: The tenant ID.
provider_id: The provider ID.
scheduler_task_id: The PeriodicTask ID.
scheduled_at: The scheduled datetime for the scan.
update_state: If True, also reset state to SCHEDULED when updating.
Returns:
The scan instance to use.
"""
scheduled_scans = list(
Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state__in=(StateChoices.SCHEDULED, StateChoices.AVAILABLE),
scheduler_task_id=scheduler_task_id,
).order_by("scheduled_at", "inserted_at")
)
if scheduled_scans:
scan_instance = scheduled_scans[0]
if len(scheduled_scans) > 1:
Scan.objects.filter(id__in=[s.id for s in scheduled_scans[1:]]).delete()
needs_update = scan_instance.scheduled_at != scheduled_at
if update_state and scan_instance.state != StateChoices.SCHEDULED:
scan_instance.state = StateChoices.SCHEDULED
scan_instance.name = SCHEDULED_SCAN_NAME
needs_update = True
if needs_update:
scan_instance.scheduled_at = scheduled_at
scan_instance.save()
return scan_instance
return Scan.objects.create(
tenant_id=tenant_id,
name=SCHEDULED_SCAN_NAME,
provider_id=provider_id,
trigger=Scan.TriggerChoices.SCHEDULED,
state=StateChoices.SCHEDULED,
scheduled_at=scheduled_at,
scheduler_task_id=scheduler_task_id,
)

View File

@@ -115,8 +115,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.16.0"
PROWLER_API_VERSION="5.16.0"
PROWLER_UI_VERSION="5.17.0"
PROWLER_API_VERSION="5.17.0"
```
<Note>

View File

@@ -38,7 +38,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.17.0"
prowler_version = "5.17.1"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"

View File

@@ -91,7 +91,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">3.9.1,<3.13"
version = "5.17.0"
version = "5.17.1"
[project.scripts]
prowler = "prowler.__main__:prowler"