Compare commits

..

37 Commits

Author SHA1 Message Date
dependabot[bot] 5e488e2ee6 chore(deps): bump https://github.com/astral-sh/ruff-pre-commit
Bumps [https://github.com/astral-sh/ruff-pre-commit](https://github.com/astral-sh/ruff-pre-commit) from v0.15.11 to 0.15.12.
- [Release notes](https://github.com/astral-sh/ruff-pre-commit/releases)
- [Commits](https://github.com/astral-sh/ruff-pre-commit/compare/v0.15.11...v0.15.12)

---
updated-dependencies:
- dependency-name: https://github.com/astral-sh/ruff-pre-commit
  dependency-version: 0.15.12
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-02 01:46:08 +00:00
Hugo Pereira Brito 8db3a89669 ci: remove andoniaf from prowler-cloud (#10926) 2026-04-30 18:07:25 +02:00
Danny Lyubenov c802dc8a36 feat(codebuild): use batched API calls to prevent throttling and false positives (#10639)
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-04-30 17:19:21 +02:00
Pedro Martín 3ab9a4efa5 chore(changelog): update with latest changes (#10948) 2026-04-30 14:13:40 +02:00
Pepe Fagoaga 36b8aa1b79 fix(boto3): pass config to clients (#10944) 2026-04-30 14:11:29 +02:00
Pedro Martín e821e07d7d docs(rbac): add Manage Alerts permission (#10947) 2026-04-30 13:58:17 +02:00
Boon 228fe6d579 feat: add ASD Essential Eight compliance framework for AWS (#10808)
Co-authored-by: Boon <boon@security8.work>
Co-authored-by: pedrooot <pedromarting3@gmail.com>
2026-04-30 13:49:08 +02:00
Pedro Martín 578186aa40 feat(sdk): integrate universal compliance into CLI pipeline (#10301) 2026-04-30 13:49:00 +02:00
Andoni Alonso 4608e45c8a fix(image): block parser-mismatch SSRF in registry auth (#10945) 2026-04-30 12:56:35 +02:00
Pedro Martín 5987651aee chore(README): update with latest changes (#10946) 2026-04-30 12:56:06 +02:00
Adrián Tomás 85800f2ddd chore(pre-commit): add priority tiers to .pre-commit-config.yaml (#10842)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-30 12:33:09 +02:00
Pablo Fernandez Guerra (PFE) 4fb5272362 refactor(ui): unify DataTable pagination into a single callback (#10863)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
2026-04-30 08:58:11 +02:00
Pepe Fagoaga 85d38b5f71 feat(scans): Reset resource failed findings to 0 for ephemeral resources (#10929) 2026-04-29 19:08:16 +02:00
Prowler Bot 59dcdb87c4 chore(docs): Bump version to v5.25.1 (#10940)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:12 +02:00
Josema Camacho 9297453b8a fix(sdk): add autouse mock_aws fixture and leak detector to prevent AWS test leaks (#10605) 2026-04-29 17:49:40 +02:00
Davlet Dzhakishev dd37f4ee1f fix(azure): update flow log compliance text for NSG retirement (#10937) 2026-04-29 16:45:58 +02:00
Pepe Fagoaga 20f36f7c84 chore: changelog v5.25.1 (#10934) 2026-04-29 14:00:53 +02:00
Pablo Fernandez Guerra (PFE) ec4d27746f fix(ui): reposition compliance card export menu (#10918)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-04-29 13:52:36 +02:00
Andoni Alonso 7076900fb1 fix(kubernetes): use cluster name as provider_uid in OCSF output (#10483)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-04-29 13:45:49 +02:00
Josema Camacho 5d90352a0f fix(api): redirect scan report and compliance downloads to presigned S3 URLs (#10927) 2026-04-29 13:19:19 +02:00
Hugo Pereira Brito a981dc64a7 docs(sdk): link route53 changelog entry to PR (#10928) 2026-04-29 12:24:27 +02:00
Josema Camacho d2086cad3f fix(api): Attack Paths AWS region fallback and stale SCHEDULED cleanup (#10917) 2026-04-29 12:20:43 +02:00
Hugo Pereira Brito 380b89cfb6 fix(sdk): cover CNAME → dangling S3 in route53 takeover check (#10920) 2026-04-29 11:14:33 +01:00
Pablo Fernandez Guerra (PFE) 13b04d339b test(ui): add E2E tests for invitation accept smart router (#10814)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
2026-04-29 10:27:30 +02:00
Pepe Fagoaga be3c5fb3c1 fix(cli): generate compliance after scan (#10919) 2026-04-28 17:18:30 +02:00
Davlet Dzhakishev 1de01bcb78 fix(azure): tighten flow log workspace checks (#10645)
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-04-28 16:57:04 +02:00
baggers27 13d983450c fix(azure): broken link for minimum TLS version (#10916) 2026-04-28 14:23:00 +02:00
Daniel Barranquero 8b368e1343 feat(aws): add bedrock_guardrails_configured security check (#10844) 2026-04-28 14:16:19 +02:00
Prowler Bot c76a9baa20 chore(ui): Bump version to v5.26.0 (#10912)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:35:54 +02:00
Prowler Bot 30e2813e02 chore(docs): Bump version to v5.25.0 (#10909)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:35:32 +02:00
Prowler Bot 0f874c6ffd chore(sdk): Bump version to v5.26.0 (#10910)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:35:21 +02:00
Prowler Bot 2242689295 chore(api): Bump version to v1.27.0 (#10913)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:34:43 +02:00
Hugo Pereira Brito e252058af4 fix(m365): exclude guest users from entra_users_mfa_capable (#10785) 2026-04-28 08:58:16 +01:00
Pepe Fagoaga 37e6c9761f chore: changelog for v5.25.0 (#10900) 2026-04-28 08:47:20 +02:00
Pepe Fagoaga ebe666bec7 chore(boto3): configure user agent extra via env (#10904) 2026-04-28 08:01:11 +02:00
Pepe Fagoaga 7df2703db1 fix(aws): get organization's metadata with assumed role (#10894) 2026-04-27 22:15:11 +01:00
Kay Agahd 67234210ba feat(aws): add check secretsmanager_has_restrictive_resource_policy (#6985) 2026-04-27 21:49:34 +01:00
463 changed files with 13229 additions and 9885 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.25.0
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.26.0
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
+1 -1
View File
@@ -62,7 +62,7 @@ jobs:
"Alan-TheGentleman"
"alejandrobailo"
"amitsharm"
"andoniaf"
# "andoniaf"
"cesararroba"
"danibarranqueroo"
"HugoPBrito"
+2 -2
View File
@@ -209,11 +209,11 @@ jobs:
echo "AWS service_paths='${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}'"
if [ "${STEPS_AWS_SERVICES_OUTPUTS_RUN_ALL}" = "true" ]; then
poetry run pytest -p no:randomly -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml tests/providers/aws
poetry run pytest -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml tests/providers/aws
elif [ -z "${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}" ]; then
echo "No AWS service paths detected; skipping AWS tests."
else
poetry run pytest -p no:randomly -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml ${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}
poetry run pytest -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml ${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}
fi
env:
STEPS_AWS_SERVICES_OUTPUTS_RUN_ALL: ${{ steps.aws-services.outputs.run_all }}
+37 -1
View File
@@ -1,17 +1,34 @@
# Priority tiers (lower = runs first, same priority = concurrent):
# P0 — fast file fixers
# P10 — validators and guards
# P20 — auto-formatters
# P30 — linters
# P40 — security scanners
# P50 — dependency validation
default_install_hook_types: [pre-commit, pre-push]
repos:
## GENERAL (prek built-in — no external repo needed)
- repo: builtin
hooks:
- id: check-merge-conflict
priority: 10
- id: check-yaml
args: ["--allow-multiple-documents"]
exclude: (prowler/config/llm_config.yaml|contrib/)
priority: 10
- id: check-json
priority: 10
- id: end-of-file-fixer
priority: 0
- id: trailing-whitespace
priority: 0
- id: no-commit-to-branch
priority: 10
- id: pretty-format-json
args: ["--autofix", --no-sort-keys, --no-ensure-ascii]
priority: 10
## TOML
- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks
@@ -20,6 +37,7 @@ repos:
- id: pretty-format-toml
args: [--autofix]
files: pyproject.toml
priority: 20
## GITHUB ACTIONS
- repo: https://github.com/zizmorcore/zizmor-pre-commit
@@ -27,6 +45,7 @@ repos:
hooks:
- id: zizmor
files: ^\.github/
priority: 30
## BASH
- repo: https://github.com/koalaman/shellcheck-precommit
@@ -34,6 +53,7 @@ repos:
hooks:
- id: shellcheck
exclude: contrib
priority: 30
## PYTHON — SDK (prowler/, tests/, dashboard/, util/, scripts/)
- repo: https://github.com/myint/autoflake
@@ -48,6 +68,7 @@ repos:
"--remove-all-unused-imports",
"--remove-unused-variable",
]
priority: 20
- repo: https://github.com/pycqa/isort
rev: 8.0.1
@@ -56,6 +77,7 @@ repos:
name: "SDK - isort"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
args: ["--profile", "black"]
priority: 20
- repo: https://github.com/psf/black
rev: 26.3.1
@@ -63,6 +85,7 @@ repos:
- id: black
name: "SDK - black"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
priority: 20
- repo: https://github.com/pycqa/flake8
rev: 7.3.0
@@ -71,18 +94,21 @@ repos:
name: "SDK - flake8"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
args: ["--ignore=E266,W503,E203,E501,W605"]
priority: 30
## PYTHON — API + MCP Server (ruff)
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.11
rev: v0.15.12
hooks:
- id: ruff
name: "API + MCP - ruff check"
files: { glob: ["{api,mcp_server}/**/*.py"] }
args: ["--fix"]
priority: 30
- id: ruff-format
name: "API + MCP - ruff format"
files: { glob: ["{api,mcp_server}/**/*.py"] }
priority: 20
## PYTHON — Poetry
- repo: https://github.com/python-poetry/poetry
@@ -93,24 +119,28 @@ repos:
args: ["--directory=./api"]
files: { glob: ["api/{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
- id: poetry-lock
name: API - poetry-lock
args: ["--directory=./api"]
files: { glob: ["api/{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
- id: poetry-check
name: SDK - poetry-check
args: ["--directory=./"]
files: { glob: ["{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
- id: poetry-lock
name: SDK - poetry-lock
args: ["--directory=./"]
files: { glob: ["{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
## CONTAINERS
- repo: https://github.com/hadolint/hadolint
@@ -118,6 +148,7 @@ repos:
hooks:
- id: hadolint
args: ["--ignore=DL3013"]
priority: 30
## LOCAL HOOKS
- repo: local
@@ -128,6 +159,7 @@ repos:
language: system
types: [python]
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
priority: 30
- id: trufflehog
name: TruffleHog
@@ -138,6 +170,7 @@ repos:
language: system
pass_filenames: false
stages: ["pre-commit", "pre-push"]
priority: 40
- id: bandit
name: bandit
@@ -148,6 +181,7 @@ repos:
files: '.*\.py'
exclude:
{ glob: ["{contrib,skills}/**", "**/.venv/**", "**/*_test.py"] }
priority: 40
- id: safety
name: safety
@@ -166,6 +200,7 @@ repos:
".safety-policy.yml",
],
}
priority: 40
- id: vulture
name: vulture
@@ -174,3 +209,4 @@ repos:
language: system
types: [python]
files: '.*\.py'
priority: 40
+12 -12
View File
@@ -104,22 +104,22 @@ Every AWS provider scan will enqueue an Attack Paths ingestion job automatically
| Provider | Checks | Services | [Compliance Frameworks](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/compliance/) | [Categories](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/misc/#categories) | Support | Interface |
|---|---|---|---|---|---|---|
| AWS | 572 | 83 | 41 | 17 | Official | UI, API, CLI |
| Azure | 165 | 20 | 18 | 13 | Official | UI, API, CLI |
| GCP | 100 | 13 | 15 | 11 | Official | UI, API, CLI |
| Kubernetes | 83 | 7 | 7 | 9 | Official | UI, API, CLI |
| GitHub | 21 | 2 | 1 | 2 | Official | UI, API, CLI |
| M365 | 89 | 9 | 4 | 5 | Official | UI, API, CLI |
| OCI | 48 | 13 | 3 | 10 | Official | UI, API, CLI |
| Alibaba Cloud | 61 | 9 | 3 | 9 | Official | UI, API, CLI |
| Cloudflare | 29 | 2 | 0 | 5 | Official | UI, API, CLI |
| AWS | 595 | 84 | 43 | 17 | Official | UI, API, CLI |
| Azure | 167 | 22 | 19 | 16 | Official | UI, API, CLI |
| GCP | 102 | 18 | 17 | 12 | Official | UI, API, CLI |
| Kubernetes | 83 | 7 | 7 | 11 | Official | UI, API, CLI |
| GitHub | 24 | 3 | 1 | 5 | Official | UI, API, CLI |
| M365 | 101 | 10 | 4 | 10 | Official | UI, API, CLI |
| OCI | 51 | 14 | 4 | 10 | Official | UI, API, CLI |
| Alibaba Cloud | 61 | 9 | 4 | 9 | Official | UI, API, CLI |
| Cloudflare | 29 | 3 | 0 | 5 | Official | UI, API, CLI |
| IaC | [See `trivy` docs.](https://trivy.dev/latest/docs/coverage/iac/) | N/A | N/A | N/A | Official | UI, API, CLI |
| MongoDB Atlas | 10 | 3 | 0 | 8 | Official | UI, API, CLI |
| LLM | [See `promptfoo` docs.](https://www.promptfoo.dev/docs/red-team/plugins/) | N/A | N/A | N/A | Official | CLI |
| Image | N/A | N/A | N/A | N/A | Official | CLI, API |
| Google Workspace | 1 | 1 | 0 | 1 | Official | CLI |
| OpenStack | 27 | 4 | 0 | 8 | Official | UI, API, CLI |
| Vercel | 30 | 6 | 0 | 5 | Official | CLI |
| Google Workspace | 25 | 4 | 2 | 4 | Official | CLI |
| OpenStack | 34 | 5 | 0 | 9 | Official | UI, API, CLI |
| Vercel | 26 | 6 | 0 | 5 | Official | CLI |
| NHN | 6 | 2 | 1 | 0 | Unofficial | CLI |
> [!Note]
+21 -4
View File
@@ -2,19 +2,36 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.26.0] (Prowler UNRELEASED)
## [1.27.0] (Prowler UNRELEASED)
### 🚀 Added
- New `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
---
## [1.26.1] (Prowler v5.25.1)
### 🐞 Fixed
- Attack Paths: AWS scans no longer fail when enabled regions cannot be retrieved, and scans stuck in `scheduled` state are now cleaned up after the stale threshold [(#10917)](https://github.com/prowler-cloud/prowler/pull/10917)
- Scan report and compliance downloads now redirect to a presigned S3 URL instead of streaming through the API worker, preventing gunicorn timeouts on large files [(#10927)](https://github.com/prowler-cloud/prowler/pull/10927)
---
## [1.26.0] (Prowler v5.25.0)
### 🚀 Added
- CIS Benchmark PDF report generation for scans, exposing the latest CIS version per provider via `GET /scans/{id}/cis/{name}/` [(#10650)](https://github.com/prowler-cloud/prowler/pull/10650)
- `/overviews/resource-groups` (resource inventory), `/overviews/categories` and `/overviews/attack-surfaces` now reflect newly-muted findings without waiting for the next scan. The post-mute `reaggregate-all-finding-group-summaries` task now also dispatches `aggregate_scan_resource_group_summaries_task`, `aggregate_scan_category_summaries_task` and `aggregate_attack_surface_task` per latest scan of every `(provider, day)` pair, rebuilding `ScanGroupSummary`, `ScanCategorySummary` and `AttackSurfaceOverview` alongside the tables already covered in #10827 [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
- CIS Benchmark PDF report generation for scans, exposing the latest CIS version per provider via `GET /scans/{id}/cis/{name}/` and picking the variant dynamically via `_pick_latest_cis_variant` (no hard-coded provider → version mapping) [(#10650)](https://github.com/prowler-cloud/prowler/pull/10650)
- Install zizmor v1.24.1 in API Docker image for GitHub Actions workflow scanning [(#10607)](https://github.com/prowler-cloud/prowler/pull/10607)
### 🔄 Changed
- Allows tenant owners to expel users from their organizations [(#10787)](https://github.com/prowler-cloud/prowler/pull/10787)
- Allows tenant owners to expel users from their organizations [(#10787)](https://github.com/prowler-cloud/prowler/pull/10787)
- `aggregate_findings`, `aggregate_attack_surface`, `aggregate_scan_resource_group_summaries` and `aggregate_scan_category_summaries` now upsert via `bulk_create(update_conflicts=True, ...)` instead of the prior `ignore_conflicts=True` / plain INSERT / `already backfilled` short-circuit. Re-runs triggered by the post-mute reaggregation pipeline no longer trip the `unique_*_per_scan` constraints nor silently drop updates, and are race-safe under concurrent writers (e.g. scan completion overlapping with a fresh mute rule) [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
- Rename the scan-category and scan-resource-group summary aggregators from `backfill_*` to `aggregate_*` (`backfill_scan_category_summaries` -> `aggregate_scan_category_summaries`, `backfill_scan_resource_group_summaries` -> `aggregate_scan_resource_group_summaries`; Celery task names `backfill-scan-category-summaries` -> `scan-category-summaries`, `backfill-scan-resource-group-summaries` -> `scan-resource-group-summaries`) and move them to the `overview` queue, matching the sibling per-scan aggregators (`perform_scan_summary_task`, `aggregate_daily_severity_task`, `aggregate_finding_group_summaries_task`, `aggregate_attack_surface_task`). The old names had no dispatchers outside the post-mute reaggregation chain, so no task-registry migration is required [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
- Rename the scan-category and scan-resource-group summary aggregators from `backfill_*` to `aggregate_*` [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
### 🐞 Fixed
+1 -1
View File
@@ -50,7 +50,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.26.0"
version = "1.27.0"
[project.scripts]
celery = "src.backend.config.settings.celery"
+2 -2
View File
@@ -52,7 +52,7 @@ class ApiConfig(AppConfig):
"check_and_fix_socialaccount_sites_migration",
]
# Skip Neo4j initialization during tests, some Django commands, and Celery
# Skip eager Neo4j init for tests, some Django commands, and Celery (prefork pool: driver must stay lazy, no post_fork hook)
if getattr(settings, "TESTING", False) or (
len(sys.argv) > 1
and (
@@ -64,7 +64,7 @@ class ApiConfig(AppConfig):
)
):
logger.info(
"Skipping Neo4j initialization because tests, some Django commands or Celery"
"Skipping eager Neo4j init: tests, some Django commands, or Celery prefork pool (driver stays lazy)"
)
else:
+48
View File
@@ -595,10 +595,40 @@ class Scan(RowLevelSecurityProtectedModel):
objects = ActiveProviderManager()
all_objects = models.Manager()
_SCOPING_SCANNER_ARG_KEYS_CACHE: tuple[str, ...] | None = None
@classmethod
def get_scoping_scanner_arg_keys(cls) -> tuple[str, ...]:
"""Return the scanner_args keys that mark a scan as scoped.
Derived from ``prowler.lib.scan.scan.Scan.__init__`` so the API stays
in sync with whatever the SDK actually accepts as filters. Cached at
class level — the signature is stable for the process lifetime.
"""
if cls._SCOPING_SCANNER_ARG_KEYS_CACHE is None:
import inspect
from prowler.lib.scan.scan import Scan as ProwlerScan
params = inspect.signature(ProwlerScan.__init__).parameters
cls._SCOPING_SCANNER_ARG_KEYS_CACHE = tuple(
name for name in params if name not in ("self", "provider")
)
return cls._SCOPING_SCANNER_ARG_KEYS_CACHE
class TriggerChoices(models.TextChoices):
SCHEDULED = "scheduled", _("Scheduled")
MANUAL = "manual", _("Manual")
# Trigger values for scans that ran the SDK end-to-end. Imported scans (or
# any future trigger) are intentionally NOT in this set — they may carry
# only a partial slice of resources, so post-scan logic that depends on a
# full-scope sweep (e.g. resetting ephemeral resource findings) must skip
# them by default.
LIVE_SCAN_TRIGGERS = frozenset(
(TriggerChoices.SCHEDULED.value, TriggerChoices.MANUAL.value)
)
id = models.UUIDField(primary_key=True, default=uuid7, editable=False)
name = models.CharField(
blank=True, null=True, max_length=100, validators=[MinLengthValidator(3)]
@@ -681,6 +711,24 @@ class Scan(RowLevelSecurityProtectedModel):
class JSONAPIMeta:
resource_name = "scans"
def is_full_scope(self) -> bool:
"""Return True if this scan ran with no scoping filters at all.
Used to gate post-scan operations (such as resetting the
failed_findings_count of resources missing from the scan) that are only
safe when the scan covered every check, service, and category. Imported
scans are NOT full-scope by definition — they may carry only a partial
slice of resources, so they're rejected via ``trigger`` even before the
scanner_args check.
"""
if self.trigger not in self.LIVE_SCAN_TRIGGERS:
return False
scanner_args = self.scanner_args or {}
for key in self.get_scoping_scanner_arg_keys():
if scanner_args.get(key):
return False
return True
class AttackPathsScan(RowLevelSecurityProtectedModel):
objects = ActiveProviderManager()
File diff suppressed because it is too large Load Diff
+59 -28
View File
@@ -3841,9 +3841,14 @@ class TestScanViewSet:
"prowler-output-123_threatscore_report.pdf",
)
presigned_url = (
"https://test-bucket.s3.amazonaws.com/"
"tenant-id/scan-id/threatscore/prowler-output-123_threatscore_report.pdf"
"?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Expires=300"
)
mock_s3_client = Mock()
mock_s3_client.list_objects_v2.return_value = {"Contents": [{"Key": pdf_key}]}
mock_s3_client.get_object.return_value = {"Body": io.BytesIO(b"pdf-bytes")}
mock_s3_client.generate_presigned_url.return_value = presigned_url
mock_env_str.return_value = bucket
mock_get_s3_client.return_value = mock_s3_client
@@ -3852,19 +3857,26 @@ class TestScanViewSet:
url = reverse("scan-threatscore", kwargs={"pk": scan.id})
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response["Content-Type"] == "application/pdf"
assert response["Content-Disposition"].endswith(
'"prowler-output-123_threatscore_report.pdf"'
)
assert response.content == b"pdf-bytes"
assert response.status_code == status.HTTP_302_FOUND
assert response["Location"] == presigned_url
mock_s3_client.list_objects_v2.assert_called_once()
mock_s3_client.get_object.assert_called_once_with(Bucket=bucket, Key=pdf_key)
mock_s3_client.generate_presigned_url.assert_called_once_with(
"get_object",
Params={
"Bucket": bucket,
"Key": pdf_key,
"ResponseContentDisposition": (
'attachment; filename="prowler-output-123_threatscore_report.pdf"'
),
"ResponseContentType": "application/pdf",
},
ExpiresIn=300,
)
def test_report_s3_success(self, authenticated_client, scans_fixture, monkeypatch):
"""
When output_location is an S3 URL and the S3 client returns the file successfully,
the view should return the ZIP file with HTTP 200 and proper headers.
When output_location is an S3 URL and the object exists,
the view should return a 302 redirect to a presigned S3 URL.
"""
scan = scans_fixture[0]
bucket = "test-bucket"
@@ -3878,22 +3890,33 @@ class TestScanViewSet:
type("env", (), {"str": lambda self, *args, **kwargs: "test-bucket"})(),
)
presigned_url = (
"https://test-bucket.s3.amazonaws.com/report.zip"
"?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Expires=300"
)
class FakeS3Client:
def get_object(self, Bucket, Key):
def head_object(self, Bucket, Key):
assert Bucket == bucket
assert Key == key
return {"Body": io.BytesIO(b"s3 zip content")}
return {}
def generate_presigned_url(self, ClientMethod, Params, ExpiresIn):
assert ClientMethod == "get_object"
assert Params["Bucket"] == bucket
assert Params["Key"] == key
assert Params["ResponseContentDisposition"] == (
'attachment; filename="report.zip"'
)
assert ExpiresIn == 300
return presigned_url
monkeypatch.setattr("api.v1.views.get_s3_client", lambda: FakeS3Client())
url = reverse("scan-report", kwargs={"pk": scan.id})
response = authenticated_client.get(url)
assert response.status_code == 200
expected_filename = os.path.basename("report.zip")
content_disposition = response.get("Content-Disposition")
assert content_disposition.startswith('attachment; filename="')
assert f'filename="{expected_filename}"' in content_disposition
assert response.content == b"s3 zip content"
assert response.status_code == status.HTTP_302_FOUND
assert response["Location"] == presigned_url
def test_report_s3_success_no_local_files(
self, authenticated_client, scans_fixture, monkeypatch
@@ -4032,23 +4055,31 @@ class TestScanViewSet:
)
match_key = "path/compliance/mitre_attack_aws.csv"
presigned_url = (
"https://test-bucket.s3.amazonaws.com/path/compliance/mitre_attack_aws.csv"
"?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Expires=300"
)
class FakeS3Client:
def list_objects_v2(self, Bucket, Prefix):
return {"Contents": [{"Key": match_key}]}
def get_object(self, Bucket, Key):
return {"Body": io.BytesIO(b"ignored")}
def generate_presigned_url(self, ClientMethod, Params, ExpiresIn):
assert ClientMethod == "get_object"
assert Params["Key"] == match_key
assert Params["ResponseContentDisposition"] == (
'attachment; filename="mitre_attack_aws.csv"'
)
assert ExpiresIn == 300
return presigned_url
monkeypatch.setattr("api.v1.views.get_s3_client", lambda: FakeS3Client())
framework = match_key.split("/")[-1].split(".")[0]
url = reverse("scan-compliance", kwargs={"pk": scan.id, "name": framework})
resp = authenticated_client.get(url)
assert resp.status_code == status.HTTP_200_OK
cd = resp["Content-Disposition"]
assert cd.startswith('attachment; filename="')
assert cd.endswith('filename="mitre_attack_aws.csv"')
assert resp.status_code == status.HTTP_302_FOUND
assert resp["Location"] == presigned_url
def test_compliance_s3_not_found(
self, authenticated_client, scans_fixture, monkeypatch
@@ -4251,8 +4282,8 @@ class TestScanViewSet:
scan.save()
fake_client = MagicMock()
fake_client.get_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey"}}, "GetObject"
fake_client.head_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey"}}, "HeadObject"
)
mock_get_s3_client.return_value = fake_client
@@ -4275,8 +4306,8 @@ class TestScanViewSet:
scan.save()
fake_client = MagicMock()
fake_client.get_object.side_effect = ClientError(
{"Error": {"Code": "AccessDenied"}}, "GetObject"
fake_client.head_object.side_effect = ClientError(
{"Error": {"Code": "AccessDenied"}}, "HeadObject"
)
mock_get_s3_client.return_value = fake_client
+112 -38
View File
@@ -53,7 +53,7 @@ from django.db.models import (
)
from django.db.models.fields.json import KeyTextTransform
from django.db.models.functions import Cast, Coalesce, RowNumber
from django.http import HttpResponse, QueryDict
from django.http import HttpResponse, HttpResponseBase, HttpResponseRedirect, QueryDict
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.dateparse import parse_date
@@ -422,7 +422,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.26.0"
spectacular_settings.VERSION = "1.27.0"
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)
@@ -2080,24 +2080,38 @@ class ScanViewSet(BaseRLSViewSet):
},
)
def _load_file(self, path_pattern, s3=False, bucket=None, list_objects=False):
def _load_file(
self,
path_pattern,
s3=False,
bucket=None,
list_objects=False,
content_type=None,
):
"""
Loads a binary file (e.g., ZIP or CSV) and returns its content and filename.
Resolve a report file location and return the bytes (filesystem) or a redirect (S3).
Depending on the input parameters, this method supports loading:
- From S3 using a direct key.
- From S3 by listing objects under a prefix and matching suffix.
- From the local filesystem using glob pattern matching.
- From S3 using a direct key, returns a 302 to a short-lived presigned URL.
- From S3 by listing objects under a prefix and matching suffix, returns a 302 to a short-lived presigned URL.
- From the local filesystem using glob pattern matching, returns the file bytes.
The S3 branch never streams bytes through the worker; this prevents gunicorn
worker timeouts on large reports.
Args:
path_pattern (str): The key or glob pattern representing the file location.
s3 (bool, optional): Whether the file is stored in S3. Defaults to False.
bucket (str, optional): The name of the S3 bucket, required if `s3=True`. Defaults to None.
list_objects (bool, optional): If True and `s3=True`, list objects by prefix to find the file. Defaults to False.
content_type (str, optional): On the S3 branch, forwarded as `ResponseContentType`
so the presigned download advertises the same Content-Type the API used to send.
Ignored on the filesystem branch.
Returns:
tuple[bytes, str]: A tuple containing the file content as bytes and the filename if successful.
Response: A DRF `Response` object with an appropriate status and error detail if an error occurs.
tuple[bytes, str]: For the filesystem branch, the file content and filename.
HttpResponseRedirect: For the S3 branch on success, a 302 redirect to a presigned `GetObject` URL.
Response: For any error path, a DRF `Response` with an appropriate status and detail.
"""
if s3:
try:
@@ -2144,25 +2158,45 @@ class ScanViewSet(BaseRLSViewSet):
# path_pattern here is prefix, but in compliance we build correct suffix check before
key = keys[0]
else:
# path_pattern is exact key
# path_pattern is exact key; HEAD before presigning to preserve the 404 contract.
key = path_pattern
try:
s3_obj = client.get_object(Bucket=bucket, Key=key)
except ClientError as e:
code = e.response.get("Error", {}).get("Code")
if code == "NoSuchKey":
try:
client.head_object(Bucket=bucket, Key=key)
except ClientError as e:
code = e.response.get("Error", {}).get("Code")
if code in ("NoSuchKey", "404"):
return Response(
{
"detail": "The scan has no reports, or the report generation task has not started yet."
},
status=status.HTTP_404_NOT_FOUND,
)
return Response(
{
"detail": "The scan has no reports, or the report generation task has not started yet."
},
status=status.HTTP_404_NOT_FOUND,
{"detail": "There is a problem with credentials."},
status=status.HTTP_403_FORBIDDEN,
)
return Response(
{"detail": "There is a problem with credentials."},
status=status.HTTP_403_FORBIDDEN,
)
content = s3_obj["Body"].read()
filename = os.path.basename(key)
# escape quotes and strip CR/LF so a malformed key cannot break out of the header
safe_filename = (
filename.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\r", "")
.replace("\n", "")
)
params = {
"Bucket": bucket,
"Key": key,
"ResponseContentDisposition": f'attachment; filename="{safe_filename}"',
}
if content_type:
params["ResponseContentType"] = content_type
url = client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=300,
)
return HttpResponseRedirect(url)
else:
files = glob.glob(path_pattern)
if not files:
@@ -2205,12 +2239,16 @@ class ScanViewSet(BaseRLSViewSet):
bucket = env.str("DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET", "")
key_prefix = scan.output_location.removeprefix(f"s3://{bucket}/")
loader = self._load_file(
key_prefix, s3=True, bucket=bucket, list_objects=False
key_prefix,
s3=True,
bucket=bucket,
list_objects=False,
content_type="application/x-zip-compressed",
)
else:
loader = self._load_file(scan.output_location, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
@@ -2248,13 +2286,19 @@ class ScanViewSet(BaseRLSViewSet):
prefix = os.path.join(
os.path.dirname(key_prefix), "compliance", f"{name}.csv"
)
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
loader = self._load_file(
prefix,
s3=True,
bucket=bucket,
list_objects=True,
content_type="text/csv",
)
else:
base = os.path.dirname(scan.output_location)
pattern = os.path.join(base, "compliance", f"*_{name}.csv")
loader = self._load_file(pattern, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
@@ -2287,13 +2331,19 @@ class ScanViewSet(BaseRLSViewSet):
"cis",
"*_cis_report.pdf",
)
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
loader = self._load_file(
prefix,
s3=True,
bucket=bucket,
list_objects=True,
content_type="application/pdf",
)
else:
base = os.path.dirname(scan.output_location)
pattern = os.path.join(base, "cis", "*_cis_report.pdf")
loader = self._load_file(pattern, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
@@ -2327,13 +2377,19 @@ class ScanViewSet(BaseRLSViewSet):
"threatscore",
"*_threatscore_report.pdf",
)
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
loader = self._load_file(
prefix,
s3=True,
bucket=bucket,
list_objects=True,
content_type="application/pdf",
)
else:
base = os.path.dirname(scan.output_location)
pattern = os.path.join(base, "threatscore", "*_threatscore_report.pdf")
loader = self._load_file(pattern, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
@@ -2367,13 +2423,19 @@ class ScanViewSet(BaseRLSViewSet):
"ens",
"*_ens_report.pdf",
)
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
loader = self._load_file(
prefix,
s3=True,
bucket=bucket,
list_objects=True,
content_type="application/pdf",
)
else:
base = os.path.dirname(scan.output_location)
pattern = os.path.join(base, "ens", "*_ens_report.pdf")
loader = self._load_file(pattern, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
@@ -2406,13 +2468,19 @@ class ScanViewSet(BaseRLSViewSet):
"nis2",
"*_nis2_report.pdf",
)
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
loader = self._load_file(
prefix,
s3=True,
bucket=bucket,
list_objects=True,
content_type="application/pdf",
)
else:
base = os.path.dirname(scan.output_location)
pattern = os.path.join(base, "nis2", "*_nis2_report.pdf")
loader = self._load_file(pattern, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
@@ -2445,13 +2513,19 @@ class ScanViewSet(BaseRLSViewSet):
"csa",
"*_csa_report.pdf",
)
loader = self._load_file(prefix, s3=True, bucket=bucket, list_objects=True)
loader = self._load_file(
prefix,
s3=True,
bucket=bucket,
list_objects=True,
content_type="application/pdf",
)
else:
base = os.path.dirname(scan.output_location)
pattern = os.path.join(base, "csa", "*_csa_report.pdf")
loader = self._load_file(pattern, s3=False)
if isinstance(loader, Response):
if isinstance(loader, HttpResponseBase):
return loader
content, filename = loader
+43 -1
View File
@@ -49,7 +49,7 @@ def start_aws_ingestion(
}
boto3_session = get_boto3_session(prowler_api_provider, prowler_sdk_provider)
regions: list[str] = list(prowler_sdk_provider._enabled_regions)
regions: list[str] = resolve_aws_regions(prowler_api_provider, prowler_sdk_provider)
requested_syncs = list(cartography_aws.RESOURCE_FUNCTIONS.keys())
sync_args = cartography_aws._build_aws_sync_kwargs(
@@ -226,6 +226,48 @@ def get_boto3_session(
return boto3_session
def resolve_aws_regions(
prowler_api_provider: ProwlerAPIProvider,
prowler_sdk_provider: ProwlerSDKProvider,
) -> list[str]:
"""Resolve the regions to scan, falling back when `_enabled_regions` is `None`.
The SDK silently sets `_enabled_regions` to `None` when `ec2:DescribeRegions`
fails (missing IAM permission, transient error). Without a fallback the
Cartography ingestion crashes with a non-actionable `TypeError`. Try the
user's `audited_regions` next, then the partition's static region list.
Excluded regions are honored on every branch.
"""
if prowler_sdk_provider._enabled_regions is not None:
regions = set(prowler_sdk_provider._enabled_regions)
elif prowler_sdk_provider.identity.audited_regions:
regions = set(prowler_sdk_provider.identity.audited_regions)
else:
partition = prowler_sdk_provider.identity.partition
try:
regions = prowler_sdk_provider.get_available_aws_service_regions(
"ec2", partition
)
except KeyError:
raise RuntimeError(
f"No region data available for partition {partition!r}; "
f"cannot determine regions to scan for "
f"{prowler_api_provider.uid}"
)
logger.warning(
f"Could not enumerate enabled regions for AWS account "
f"{prowler_api_provider.uid}; falling back to all regions in "
f"partition {partition!r}"
)
excluded = set(getattr(prowler_sdk_provider, "_excluded_regions", None) or ())
return sorted(regions - excluded)
def get_aioboto3_session(boto3_session: boto3.Session) -> aioboto3.Session:
return aioboto3.Session(botocore_session=boto3_session._session)
@@ -18,28 +18,45 @@ logger = get_task_logger(__name__)
def cleanup_stale_attack_paths_scans() -> dict:
"""
Find `EXECUTING` `AttackPathsScan` scans whose workers are dead or that have
exceeded the stale threshold, and mark them as `FAILED`.
Mark stale `AttackPathsScan` rows as `FAILED`.
Two-pass detection:
Covers two stuck-state scenarios:
1. `EXECUTING` scans whose workers are dead, or that have exceeded the
stale threshold while alive.
2. `SCHEDULED` scans that never made it to a worker — parent scan
crashed before dispatch, broker lost the message, etc. Detected by
age plus the parent `Scan` no longer being in flight.
"""
threshold = timedelta(minutes=ATTACK_PATHS_SCAN_STALE_THRESHOLD_MINUTES)
now = datetime.now(tz=timezone.utc)
cutoff = now - threshold
cleaned_up: list[str] = []
cleaned_up.extend(_cleanup_stale_executing_scans(cutoff))
cleaned_up.extend(_cleanup_stale_scheduled_scans(cutoff))
logger.info(
f"Stale `AttackPathsScan` cleanup: {len(cleaned_up)} scan(s) cleaned up"
)
return {"cleaned_up_count": len(cleaned_up), "scan_ids": cleaned_up}
def _cleanup_stale_executing_scans(cutoff: datetime) -> list[str]:
"""
Two-pass detection for `EXECUTING` scans:
1. If `TaskResult.worker` exists, ping the worker.
- Dead worker: cleanup immediately (any age).
- Alive + past threshold: revoke the task, then cleanup.
- Alive + within threshold: skip.
2. If no worker field: fall back to time-based heuristic only.
"""
threshold = timedelta(minutes=ATTACK_PATHS_SCAN_STALE_THRESHOLD_MINUTES)
now = datetime.now(tz=timezone.utc)
cutoff = now - threshold
executing_scans = (
executing_scans = list(
AttackPathsScan.all_objects.using(MainRouter.admin_db)
.filter(state=StateChoices.EXECUTING)
.select_related("task__task_runner_task")
)
# Cache worker liveness so each worker is pinged at most once
executing_scans = list(executing_scans)
workers = {
tr.worker
for scan in executing_scans
@@ -48,7 +65,7 @@ def cleanup_stale_attack_paths_scans() -> dict:
}
worker_alive = {w: _is_worker_alive(w) for w in workers}
cleaned_up = []
cleaned_up: list[str] = []
for scan in executing_scans:
task_result = (
@@ -65,9 +82,7 @@ def cleanup_stale_attack_paths_scans() -> dict:
# Alive but stale — revoke before cleanup
_revoke_task(task_result)
reason = (
"Scan exceeded stale threshold — " "cleaned up by periodic task"
)
reason = "Scan exceeded stale threshold — cleaned up by periodic task"
else:
reason = "Worker dead — cleaned up by periodic task"
else:
@@ -82,10 +97,57 @@ def cleanup_stale_attack_paths_scans() -> dict:
if _cleanup_scan(scan, task_result, reason):
cleaned_up.append(str(scan.id))
logger.info(
f"Stale `AttackPathsScan` cleanup: {len(cleaned_up)} scan(s) cleaned up"
return cleaned_up
def _cleanup_stale_scheduled_scans(cutoff: datetime) -> list[str]:
"""
Cleanup `SCHEDULED` scans that never reached a worker.
Detection:
- `state == SCHEDULED`
- `started_at < cutoff`
- parent `Scan` is no longer in flight (terminal state or missing). This
avoids cleaning up rows whose parent Prowler scan is legitimately still
running.
For each match: revoke the queued task (best-effort; harmless if already
consumed), atomically flip to `FAILED`, and mark the `TaskResult`. The
temp Neo4j database is never created while `SCHEDULED`, so no drop is
needed.
"""
scheduled_scans = list(
AttackPathsScan.all_objects.using(MainRouter.admin_db)
.filter(
state=StateChoices.SCHEDULED,
started_at__lt=cutoff,
)
.select_related("task__task_runner_task", "scan")
)
return {"cleaned_up_count": len(cleaned_up), "scan_ids": cleaned_up}
cleaned_up: list[str] = []
parent_terminal = (
StateChoices.COMPLETED,
StateChoices.FAILED,
StateChoices.CANCELLED,
)
for scan in scheduled_scans:
parent_scan = scan.scan
if parent_scan is not None and parent_scan.state not in parent_terminal:
continue
task_result = (
getattr(scan.task, "task_runner_task", None) if scan.task else None
)
if task_result:
_revoke_task(task_result, terminate=False)
reason = "Scan never started — cleaned up by periodic task"
if _cleanup_scheduled_scan(scan, task_result, reason):
cleaned_up.append(str(scan.id))
return cleaned_up
def _is_worker_alive(worker: str) -> bool:
@@ -98,12 +160,17 @@ def _is_worker_alive(worker: str) -> bool:
return True
def _revoke_task(task_result) -> None:
"""Send `SIGTERM` to a hung Celery task. Non-fatal on failure."""
def _revoke_task(task_result, terminate: bool = True) -> None:
"""Revoke a Celery task. Non-fatal on failure.
`terminate=True` SIGTERMs the worker if the task is mid-execution; use
for EXECUTING cleanup. `terminate=False` only marks the task id revoked
across workers, so any worker pulling the queued message discards it;
use for SCHEDULED cleanup where the task hasn't run yet.
"""
try:
current_app.control.revoke(
task_result.task_id, terminate=True, signal="SIGTERM"
)
kwargs = {"terminate": True, "signal": "SIGTERM"} if terminate else {}
current_app.control.revoke(task_result.task_id, **kwargs)
logger.info(f"Revoked task {task_result.task_id}")
except Exception:
logger.exception(f"Failed to revoke task {task_result.task_id}")
@@ -125,28 +192,64 @@ def _cleanup_scan(scan, task_result, reason: str) -> bool:
except Exception:
logger.exception(f"Failed to drop temp database {tmp_db_name}")
# 2. Lock row, verify still EXECUTING, mark FAILED — all atomic
with rls_transaction(str(scan.tenant_id)):
try:
fresh_scan = AttackPathsScan.objects.select_for_update().get(id=scan.id)
except AttackPathsScan.DoesNotExist:
logger.warning(f"Scan {scan_id_str} no longer exists, skipping")
return False
fresh_scan = _finalize_failed_scan(scan, StateChoices.EXECUTING, reason)
if fresh_scan is None:
return False
if fresh_scan.state != StateChoices.EXECUTING:
logger.info(f"Scan {scan_id_str} is now {fresh_scan.state}, skipping")
return False
_mark_scan_finished(fresh_scan, StateChoices.FAILED, {"global_error": reason})
# 3. Mark `TaskResult` as `FAILURE` (not RLS-protected, outside lock)
# Mark `TaskResult` as `FAILURE` (not RLS-protected, outside lock)
if task_result:
task_result.status = states.FAILURE
task_result.date_done = datetime.now(tz=timezone.utc)
task_result.save(update_fields=["status", "date_done"])
# 4. Recover graph_data_ready if provider data still exists
recover_graph_data_ready(fresh_scan)
logger.info(f"Cleaned up stale scan {scan_id_str}: {reason}")
return True
def _cleanup_scheduled_scan(scan, task_result, reason: str) -> bool:
"""
Clean up a `SCHEDULED` scan that never reached a worker.
Skips the temp Neo4j drop — the database is only created once the worker
enters `EXECUTING`, so dropping it here just produces noisy log output.
Returns `True` if the scan was actually cleaned up, `False` if skipped.
"""
scan_id_str = str(scan.id)
fresh_scan = _finalize_failed_scan(scan, StateChoices.SCHEDULED, reason)
if fresh_scan is None:
return False
if task_result:
task_result.status = states.FAILURE
task_result.date_done = datetime.now(tz=timezone.utc)
task_result.save(update_fields=["status", "date_done"])
logger.info(f"Cleaned up scheduled scan {scan_id_str}: {reason}")
return True
def _finalize_failed_scan(scan, expected_state: str, reason: str):
"""
Atomically lock the row, verify it's still in `expected_state`, and
mark it `FAILED`. Returns the locked row on success, `None` if the
row is gone or has already moved on.
"""
scan_id_str = str(scan.id)
with rls_transaction(str(scan.tenant_id)):
try:
fresh_scan = AttackPathsScan.objects.select_for_update().get(id=scan.id)
except AttackPathsScan.DoesNotExist:
logger.warning(f"Scan {scan_id_str} no longer exists, skipping")
return None
if fresh_scan.state != expected_state:
logger.info(f"Scan {scan_id_str} is now {fresh_scan.state}, skipping")
return None
_mark_scan_finished(fresh_scan, StateChoices.FAILED, {"global_error": reason})
return fresh_scan
@@ -67,25 +67,52 @@ def retrieve_attack_paths_scan(
return None
def set_attack_paths_scan_task_id(
tenant_id: str,
scan_pk: str,
task_id: str,
) -> None:
"""Persist the Celery `task_id` on the `AttackPathsScan` row.
Called at dispatch time (when `apply_async` returns) so the row carries
the task id even while still `SCHEDULED`. This lets the periodic
cleanup revoke queued messages for scans that never reached a worker.
"""
with rls_transaction(tenant_id):
ProwlerAPIAttackPathsScan.objects.filter(id=scan_pk).update(task_id=task_id)
def starting_attack_paths_scan(
attack_paths_scan: ProwlerAPIAttackPathsScan,
task_id: str,
cartography_config: CartographyConfig,
) -> None:
with rls_transaction(attack_paths_scan.tenant_id):
attack_paths_scan.task_id = task_id
attack_paths_scan.state = StateChoices.EXECUTING
attack_paths_scan.started_at = datetime.now(tz=timezone.utc)
attack_paths_scan.update_tag = cartography_config.update_tag
) -> bool:
"""Flip the row from `SCHEDULED` to `EXECUTING` atomically.
attack_paths_scan.save(
update_fields=[
"task_id",
"state",
"started_at",
"update_tag",
]
)
Returns `False` if the row is gone or has already moved past
`SCHEDULED` (e.g., periodic cleanup raced ahead and marked it
`FAILED` while the worker message was still in flight).
"""
with rls_transaction(attack_paths_scan.tenant_id):
try:
locked = ProwlerAPIAttackPathsScan.objects.select_for_update().get(
id=attack_paths_scan.id
)
except ProwlerAPIAttackPathsScan.DoesNotExist:
return False
if locked.state != StateChoices.SCHEDULED:
return False
locked.state = StateChoices.EXECUTING
locked.started_at = datetime.now(tz=timezone.utc)
locked.update_tag = cartography_config.update_tag
locked.save(update_fields=["state", "started_at", "update_tag"])
# Keep the in-memory object the caller is holding in sync.
attack_paths_scan.state = locked.state
attack_paths_scan.started_at = locked.started_at
attack_paths_scan.update_tag = locked.update_tag
return True
def _mark_scan_finished(
@@ -97,6 +97,19 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]:
)
attack_paths_scan = db_utils.retrieve_attack_paths_scan(tenant_id, scan_id)
# Idempotency guard: cleanup may have flipped this row to a terminal state
# while the message was still in flight. Bail out before touching state.
if attack_paths_scan and attack_paths_scan.state in (
StateChoices.FAILED,
StateChoices.COMPLETED,
StateChoices.CANCELLED,
):
logger.warning(
f"Attack Paths scan {attack_paths_scan.id} already in terminal "
f"state {attack_paths_scan.state}; skipping execution"
)
return {}
# Checks before starting the scan
if not cartography_ingestion_function:
ingestion_exceptions = {
@@ -114,12 +127,17 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]:
else:
if not attack_paths_scan:
# Safety net for in-flight messages or direct task invocations; dispatcher normally pre-creates the row.
logger.warning(
f"No Attack Paths Scan found for scan {scan_id} and tenant {tenant_id}, let's create it then"
)
attack_paths_scan = db_utils.create_attack_paths_scan(
tenant_id, scan_id, prowler_api_provider.id
)
if attack_paths_scan and task_id:
db_utils.set_attack_paths_scan_task_id(
tenant_id, attack_paths_scan.id, task_id
)
tmp_database_name = graph_database.get_database_name(
attack_paths_scan.id, temporary=True
@@ -141,9 +159,13 @@ def run(tenant_id: str, scan_id: str, task_id: str) -> dict[str, Any]:
)
# Starting the Attack Paths scan
db_utils.starting_attack_paths_scan(
attack_paths_scan, task_id, tenant_cartography_config
)
if not db_utils.starting_attack_paths_scan(
attack_paths_scan, tenant_cartography_config
):
logger.warning(
f"Attack Paths scan {attack_paths_scan.id} no longer in SCHEDULED state; cleanup likely raced ahead"
)
return {}
scan_t0 = time.perf_counter()
logger.info(
+181 -2
View File
@@ -10,16 +10,29 @@ from typing import Any
import sentry_sdk
from celery.utils.log import get_task_logger
from config.django.base import DJANGO_FINDINGS_BATCH_SIZE
from config.env import env
from config.settings.celery import CELERY_DEADLOCK_ATTEMPTS
from django.db import IntegrityError, OperationalError
from django.db.models import Case, Count, IntegerField, Max, Min, Prefetch, Q, Sum, When
from django.db.models import (
Case,
Count,
Exists,
IntegerField,
Max,
Min,
OuterRef,
Prefetch,
Q,
Sum,
When,
)
from django.utils import timezone as django_timezone
from tasks.jobs.queries import (
COMPLIANCE_UPSERT_PROVIDER_SCORE_SQL,
COMPLIANCE_UPSERT_TENANT_SUMMARY_SQL,
)
from tasks.utils import CustomEncoder
from tasks.utils import CustomEncoder, batched
from api.compliance import PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE
from api.constants import SEVERITY_ORDER
@@ -2069,3 +2082,169 @@ def aggregate_finding_group_summaries(tenant_id: str, scan_id: str):
"created": created_count,
"updated": updated_count,
}
def reset_ephemeral_resource_findings_count(tenant_id: str, scan_id: str) -> dict:
"""Zero failed_findings_count for resources missing from a completed full-scope scan.
Resources that exist in the database for the scan's provider but were not
touched by this scan are treated as ephemeral. We keep their historical
findings, but reset the denormalized counter that drives the Resources page
sort so they stop ranking at the top.
Skipped (no-op) when:
- The scan is not in COMPLETED state.
- The scan ran with any scoping filter in scanner_args (partial scope).
Query design (must scale to 500k+ resources per provider):
Phase 1 — collect ephemeral IDs with one anti-join read.
Outer filter ``(tenant_id, provider_id, failed_findings_count > 0)``
uses ``resources_tenant_provider_idx``. The correlated
``NOT EXISTS`` subquery hits the implicit unique index
``(tenant_id, scan_id, resource_id)`` on ``ResourceScanSummary``.
``NOT EXISTS`` (vs ``NOT IN``) is null-safe and lets the planner
choose between hash anti-join and indexed nested-loop anti-join.
``.iterator(chunk_size=...)`` skips the queryset cache so memory
stays bounded while streaming UUIDs.
Phase 2 — UPDATE in fixed-size batches.
One large UPDATE would hold row-exclusive locks for seconds and
create a WAL spike. Batched UPDATEs by ``id__in`` (~1k rows each)
hit the primary key, keep each lock window ~50ms, bound WAL chunks,
and let other writers proceed between batches.
``failed_findings_count__gt=0`` in the UPDATE is idempotent under
concurrent scans and skips no-op rewrites.
Reads use the primary DB, not the replica: ``ResourceScanSummary`` rows
were written by the same scan task that triggered this one, so replica
lag could falsely classify scanned resources as ephemeral.
Scope detection (``Scan.is_full_scope()``) derives the set of scoping
scanner_args from ``prowler.lib.scan.scan.Scan.__init__`` via
introspection, so the API can never drift from the SDK's filter
contract. Imported scans are also rejected by trigger — they may only
cover a partial slice of resources.
"""
with rls_transaction(tenant_id):
scan = Scan.objects.filter(tenant_id=tenant_id, id=scan_id).first()
if scan is None:
logger.warning(f"Scan {scan_id} not found")
return {"status": "skipped", "reason": "scan not found"}
if scan.state != StateChoices.COMPLETED:
logger.info(f"Scan {scan_id} not completed; skipping ephemeral reset")
return {"status": "skipped", "reason": "scan not completed"}
if not scan.is_full_scope():
logger.info(
f"Scan {scan_id} ran with scoping filters; skipping ephemeral reset"
)
return {"status": "skipped", "reason": "partial scan scope"}
# Race protection: if a newer completed full-scope scan exists for this
# provider, our ResourceScanSummary set is stale relative to the resources'
# current failed_findings_count values (which the newer scan already
# refreshed). Wiping based on the older scan would zero counts the newer
# scan just set. Skip and let the newer scan's reset task do the work; if
# this task was delayed in the queue, that's the correct outcome.
# `completed_at__isnull=False` is required: Postgres orders NULL first in
# DESC, so a sibling COMPLETED scan with a missing completed_at would sort
# as "newest" and incorrectly cause us to skip.
with rls_transaction(tenant_id):
latest_full_scope_scan_id = (
Scan.objects.filter(
tenant_id=tenant_id,
provider_id=scan.provider_id,
state=StateChoices.COMPLETED,
completed_at__isnull=False,
)
.order_by("-completed_at", "-inserted_at")
.values_list("id", flat=True)
.first()
)
if latest_full_scope_scan_id != scan.id:
logger.info(
f"Scan {scan_id} is not the latest completed scan for provider "
f"{scan.provider_id}; skipping ephemeral reset"
)
return {"status": "skipped", "reason": "newer scan exists"}
# Defensive gate: ResourceScanSummary rows are written by perform_prowler_scan
# via best-effort bulk_create. If those writes failed silently (or the scan
# genuinely produced resources but no summaries were persisted), the
# ~Exists(in_scan) anti-join below would classify EVERY resource for this
# provider as ephemeral and zero their counts. Bail loudly instead.
with rls_transaction(tenant_id):
summaries_present = ResourceScanSummary.objects.filter(
tenant_id=tenant_id, scan_id=scan_id
).exists()
if scan.unique_resource_count > 0 and not summaries_present:
logger.error(
f"Scan {scan_id} reports {scan.unique_resource_count} unique "
f"resources but no ResourceScanSummary rows are persisted; "
f"skipping ephemeral reset to avoid wiping valid counts"
)
return {"status": "skipped", "reason": "summaries missing"}
# Stays on the primary DB intentionally. ResourceScanSummary rows are
# written by perform_prowler_scan in the same chain that triggered this
# task, so replica lag could return an empty/partial summary set; a stale
# read here would classify every Resource as ephemeral and wipe valid
# failed_findings_count values on the primary. Same rationale as
# update_provider_compliance_scores below in this module.
# Materializing the ID list (rather than streaming the iterator into
# batched UPDATEs) is intentional: it lets the UPDATEs run in their own
# short rls_transactions instead of one long transaction holding row locks
# on every batch. At 500k UUIDs the peak memory is ~40 MB — acceptable for
# a Celery worker — and is the better trade-off versus a multi-second
# write-lock window blocking concurrent scans.
with rls_transaction(tenant_id):
in_scan = ResourceScanSummary.objects.filter(
tenant_id=tenant_id,
scan_id=scan_id,
resource_id=OuterRef("pk"),
)
ephemeral_ids = list(
Resource.objects.filter(
tenant_id=tenant_id,
provider_id=scan.provider_id,
failed_findings_count__gt=0,
)
.filter(~Exists(in_scan))
.values_list("id", flat=True)
.iterator(chunk_size=DJANGO_FINDINGS_BATCH_SIZE)
)
if not ephemeral_ids:
logger.info(f"No ephemeral resources for scan {scan_id}")
return {
"status": "completed",
"scan_id": str(scan_id),
"provider_id": str(scan.provider_id),
"reset": 0,
}
total_updated = 0
for batch, _ in batched(ephemeral_ids, DJANGO_FINDINGS_BATCH_SIZE):
# batched() always yields a final tuple, which is empty when the input
# length is an exact multiple of the batch size. Skip it so we don't
# issue a no-op UPDATE ... WHERE id IN ().
if not batch:
continue
with rls_transaction(tenant_id):
total_updated += Resource.objects.filter(
tenant_id=tenant_id,
id__in=batch,
failed_findings_count__gt=0,
).update(failed_findings_count=0)
logger.info(
f"Ephemeral resource reset for scan {scan_id}: "
f"{total_updated} resources zeroed for provider {scan.provider_id}"
)
return {
"status": "completed",
"scan_id": str(scan_id),
"provider_id": str(scan.provider_id),
"reset": total_updated,
}
+53 -2
View File
@@ -58,6 +58,7 @@ from tasks.jobs.scan import (
aggregate_findings,
create_compliance_requirements,
perform_prowler_scan,
reset_ephemeral_resource_findings_count,
update_provider_compliance_scores,
)
from tasks.utils import (
@@ -77,6 +78,7 @@ from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.finding import Finding as FindingOutput
logger = get_task_logger(__name__)
@@ -158,6 +160,13 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
generate_outputs_task.si(
scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id
),
# post-scan task — runs in the parallel group so a
# failure cannot cascade into reports or integrations. Its only
# prerequisite is that perform_prowler_scan has committed
# ResourceScanSummary, which is true by the time this chain fires.
reset_ephemeral_resource_findings_count_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
),
group(
# Use optimized task that generates both reports with shared queries
@@ -173,10 +182,25 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
).apply_async()
if can_provider_run_attack_paths_scan(tenant_id, provider_id):
perform_attack_paths_scan_task.apply_async(
# Row is normally created upstream, so this is a safeguard so we can attach the task id below
attack_paths_scan = attack_paths_db_utils.retrieve_attack_paths_scan(
tenant_id, scan_id
)
if attack_paths_scan is None:
attack_paths_scan = attack_paths_db_utils.create_attack_paths_scan(
tenant_id, scan_id, provider_id
)
# Persist the Celery task id so the periodic cleanup can revoke scans stuck in SCHEDULED
result = perform_attack_paths_scan_task.apply_async(
kwargs={"tenant_id": tenant_id, "scan_id": scan_id}
)
if attack_paths_scan and result:
attack_paths_db_utils.set_attack_paths_scan_task_id(
tenant_id, attack_paths_scan.id, result.task_id
)
@shared_task(base=RLSTask, name="provider-connection-check")
@set_tenant
@@ -378,7 +402,8 @@ class AttackPathsScanRLSTask(RLSTask):
SDK initialization, or Neo4j configuration errors during setup).
"""
def on_failure(self, exc, task_id, args, kwargs, _einfo):
def on_failure(self, exc, task_id, args, kwargs, _einfo): # noqa: ARG002
del args # Required by Celery's Task.on_failure signature; not used.
tenant_id = kwargs.get("tenant_id")
scan_id = kwargs.get("scan_id")
@@ -775,6 +800,32 @@ def aggregate_daily_severity_task(tenant_id: str, scan_id: str):
return aggregate_daily_severity(tenant_id=tenant_id, scan_id=scan_id)
@shared_task(name="scan-reset-ephemeral-resources", queue="overview")
@handle_provider_deletion
def reset_ephemeral_resource_findings_count_task(tenant_id: str, scan_id: str):
"""Reset failed_findings_count for resources missing from a completed full-scope scan.
Failures are swallowed and returned as a status: this task lives inside the
post-scan group, and Celery propagates group-member exceptions into the next
chain step meaning a crash here would block compliance reports and
integrations. The reset is purely cosmetic (UI sort optimization), so a
bad run is logged and absorbed rather than allowed to cascade.
"""
try:
return reset_ephemeral_resource_findings_count(
tenant_id=tenant_id, scan_id=scan_id
)
except Exception as exc: # noqa: BLE001 — intentionally broad
logger.exception(
f"reset_ephemeral_resource_findings_count failed for scan {scan_id}: {exc}"
)
return {
"status": "failed",
"scan_id": str(scan_id),
"reason": str(exc),
}
@shared_task(base=RLSTask, name="scan-finding-group-summaries", queue="overview")
@set_tenant(keep_tenant=True)
@handle_provider_deletion
@@ -135,7 +135,7 @@ class TestAttackPathsRun:
assert result == ingestion_result
mock_retrieve_scan.assert_called_once_with(str(tenant.id), str(scan.id))
mock_starting.assert_called_once()
config = mock_starting.call_args[0][2]
config = mock_starting.call_args[0][1]
assert config.neo4j_database == "tenant-db"
mock_get_db_name.assert_has_calls(
[call(attack_paths_scan.id, temporary=True), call(provider.tenant_id)]
@@ -2732,3 +2732,143 @@ class TestCleanupStaleAttackPathsScans:
assert result["cleaned_up_count"] == 2
# Worker should be pinged exactly once — cache prevents second ping
mock_alive.assert_called_once_with("shared-worker@host")
# `SCHEDULED` state cleanup
def _create_scheduled_scan(
self,
tenant,
provider,
*,
age_minutes,
parent_state,
with_task=True,
):
"""Create a SCHEDULED AttackPathsScan with a parent Scan in `parent_state`.
`age_minutes` controls how far in the past `started_at` is set, so
callers can place rows safely past the cleanup cutoff.
"""
parent_scan = Scan.objects.create(
name="Parent Prowler scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=parent_state,
tenant_id=tenant.id,
)
ap_scan = AttackPathsScan.objects.create(
tenant_id=tenant.id,
provider=provider,
scan=parent_scan,
state=StateChoices.SCHEDULED,
started_at=datetime.now(tz=timezone.utc) - timedelta(minutes=age_minutes),
)
task_result = None
if with_task:
task_result = TaskResult.objects.create(
task_id=str(ap_scan.id),
task_name="attack-paths-scan-perform",
status="PENDING",
)
task = Task.objects.create(
id=task_result.task_id,
task_runner_task=task_result,
tenant_id=tenant.id,
)
ap_scan.task = task
ap_scan.save(update_fields=["task_id"])
return ap_scan, task_result
@patch("tasks.jobs.attack_paths.cleanup.recover_graph_data_ready")
@patch("tasks.jobs.attack_paths.cleanup.graph_database.drop_database")
@patch(
"tasks.jobs.attack_paths.cleanup.rls_transaction",
new=lambda *args, **kwargs: nullcontext(),
)
@patch("tasks.jobs.attack_paths.cleanup._revoke_task")
def test_cleans_up_scheduled_scan_when_parent_is_terminal(
self,
mock_revoke,
mock_drop_db,
mock_recover,
tenants_fixture,
providers_fixture,
):
from tasks.jobs.attack_paths.cleanup import cleanup_stale_attack_paths_scans
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
ap_scan, task_result = self._create_scheduled_scan(
tenant,
provider,
age_minutes=24 * 60 * 3, # 3 days, safely past any threshold
parent_state=StateChoices.FAILED,
)
result = cleanup_stale_attack_paths_scans()
assert result["cleaned_up_count"] == 1
assert str(ap_scan.id) in result["scan_ids"]
ap_scan.refresh_from_db()
assert ap_scan.state == StateChoices.FAILED
assert ap_scan.progress == 100
assert ap_scan.completed_at is not None
assert ap_scan.ingestion_exceptions == {
"global_error": "Scan never started — cleaned up by periodic task"
}
# SCHEDULED revoke must NOT terminate a running worker
mock_revoke.assert_called_once()
assert mock_revoke.call_args.kwargs == {"terminate": False}
# Temp DB never created for SCHEDULED, so no drop attempted
mock_drop_db.assert_not_called()
# Tenant Neo4j data is untouched in this path
mock_recover.assert_not_called()
task_result.refresh_from_db()
assert task_result.status == "FAILURE"
assert task_result.date_done is not None
@patch("tasks.jobs.attack_paths.cleanup.recover_graph_data_ready")
@patch("tasks.jobs.attack_paths.cleanup.graph_database.drop_database")
@patch(
"tasks.jobs.attack_paths.cleanup.rls_transaction",
new=lambda *args, **kwargs: nullcontext(),
)
@patch("tasks.jobs.attack_paths.cleanup._revoke_task")
def test_skips_scheduled_scan_when_parent_still_in_flight(
self,
mock_revoke,
mock_drop_db,
mock_recover,
tenants_fixture,
providers_fixture,
):
from tasks.jobs.attack_paths.cleanup import cleanup_stale_attack_paths_scans
tenant = tenants_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
ap_scan, _ = self._create_scheduled_scan(
tenant,
provider,
age_minutes=24 * 60 * 3,
parent_state=StateChoices.EXECUTING,
)
result = cleanup_stale_attack_paths_scans()
assert result["cleaned_up_count"] == 0
ap_scan.refresh_from_db()
assert ap_scan.state == StateChoices.SCHEDULED
mock_revoke.assert_not_called()
+314
View File
@@ -24,6 +24,7 @@ from tasks.jobs.scan import (
aggregate_findings,
create_compliance_requirements,
perform_prowler_scan,
reset_ephemeral_resource_findings_count,
update_provider_compliance_scores,
)
from tasks.utils import CustomEncoder
@@ -35,6 +36,7 @@ from api.models import (
MuteRule,
Provider,
Resource,
ResourceScanSummary,
Scan,
ScanSummary,
StateChoices,
@@ -4335,3 +4337,315 @@ class TestUpdateProviderComplianceScores:
assert any("provider_compliance_scores" in c for c in calls)
assert any("tenant_compliance_summaries" in c for c in calls)
assert any("pg_advisory_xact_lock" in c for c in calls)
class TestScanIsFullScope:
def _live_trigger(self):
return Scan.TriggerChoices.MANUAL
@pytest.mark.parametrize(
"scanner_args",
[
{},
{"unrelated": "value"},
{"checks": None},
{"services": []},
{"severities": ""},
],
)
def test_full_scope_when_no_filters_present(self, scanner_args):
scan = Scan(scanner_args=scanner_args, trigger=self._live_trigger())
assert scan.is_full_scope() is True
def test_full_scope_covers_every_sdk_kwarg(self):
# Lock the predicate to whatever ProwlerScan's __init__ exposes today.
# If the SDK adds a new filter, this test still passes via the
# introspection-driven derivation; if it adds a non-filter kwarg
# (e.g. provider-like), keep the exclusion list in sync in models.py.
from prowler.lib.scan.scan import Scan as ProwlerScan
import inspect
expected = tuple(
name
for name in inspect.signature(ProwlerScan.__init__).parameters
if name not in ("self", "provider")
)
assert Scan.get_scoping_scanner_arg_keys() == expected
# Spot-check a few well-known filters survive the introspection.
assert "checks" in expected
assert "services" in expected
assert "severities" in expected
def test_partial_scope_for_each_sdk_filter(self):
for key in Scan.get_scoping_scanner_arg_keys():
scan = Scan(scanner_args={key: ["x"]}, trigger=self._live_trigger())
assert scan.is_full_scope() is False, f"{key} should mark scan as partial"
def test_imported_scan_is_never_full_scope(self):
# Forward-defensive: any trigger outside LIVE_SCAN_TRIGGERS (e.g. a
# future "imported" trigger) must never qualify, even with empty args.
scan = Scan(scanner_args={}, trigger="imported")
assert scan.is_full_scope() is False
def test_handles_none_scanner_args(self):
scan = Scan(scanner_args=None, trigger=self._live_trigger())
assert scan.is_full_scope() is True
@pytest.mark.django_db
class TestResetEphemeralResourceFindingsCount:
def _make_scan_summary(self, tenant_id, scan_id, resource):
return ResourceScanSummary.objects.create(
tenant_id=tenant_id,
scan_id=scan_id,
resource_id=resource.id,
service=resource.service,
region=resource.region,
resource_type=resource.type,
)
def test_resets_only_resources_missing_from_full_scope_scan(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, scan2, *_ = scans_fixture
resource1, resource2, resource3 = resources_fixture
Resource.objects.filter(id=resource1.id).update(failed_findings_count=3)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
Resource.objects.filter(id=resource3.id).update(failed_findings_count=7)
# Only resource1 was scanned in scan1; resource2 is ephemeral.
self._make_scan_summary(tenant.id, scan1.id, resource1)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 1
resource1.refresh_from_db()
resource2.refresh_from_db()
resource3.refresh_from_db()
assert resource1.failed_findings_count == 3
assert resource2.failed_findings_count == 0
# Other provider's resource is never touched.
assert resource3.failed_findings_count == 7
def test_skips_when_scan_not_completed(
self, tenants_fixture, scans_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
Scan.objects.filter(id=scan1.id).update(state=StateChoices.EXECUTING)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "scan not completed"
resource2.refresh_from_db()
assert resource2.failed_findings_count == 5
def test_skips_when_scan_has_scoping_filters(
self, tenants_fixture, scans_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
_, resource2, _ = resources_fixture
Scan.objects.filter(id=scan1.id).update(scanner_args={"checks": ["check1"]})
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "partial scan scope"
resource2.refresh_from_db()
assert resource2.failed_findings_count == 5
def test_skips_when_scan_not_found(self, tenants_fixture):
tenant, *_ = tenants_fixture
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(uuid.uuid4())
)
assert result["status"] == "skipped"
assert result["reason"] == "scan not found"
def test_skips_when_newer_scan_completed_for_same_provider(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
# If a newer completed scan exists for the same provider, our
# ResourceScanSummary set is stale relative to the resources' current
# counts, and applying the diff would corrupt them.
from datetime import timedelta
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
provider, *_ = providers_fixture
_, resource2, _ = resources_fixture
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
# Create a newer COMPLETED scan for the same provider, with an
# explicit completed_at strictly after scan1's so ordering is
# deterministic regardless of clock resolution.
newer_completed_at = scan1.completed_at + timedelta(minutes=5)
Scan.objects.create(
name="Newer Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
started_at=newer_completed_at,
completed_at=newer_completed_at,
)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "newer scan exists"
resource2.refresh_from_db()
assert resource2.failed_findings_count == 5
def test_does_not_touch_other_providers_resources(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
_, _, resource3 = resources_fixture
# resource3 belongs to provider2 with failed_findings_count > 0 and is
# not in scan1's summary. It MUST NOT be reset.
Resource.objects.filter(id=resource3.id).update(failed_findings_count=9)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 0
resource3.refresh_from_db()
assert resource3.failed_findings_count == 9
def test_resources_already_zero_are_not_rewritten(
self, tenants_fixture, scans_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
# Both resources already at 0, neither in summary -> nothing to update.
Resource.objects.filter(id=resource1.id).update(failed_findings_count=0)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=0)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 0
def test_skips_when_summaries_missing_for_scan_with_resources(
self, tenants_fixture, scans_fixture, resources_fixture
):
# Catastrophic guard: if a scan reports unique_resource_count > 0 but
# no ResourceScanSummary rows are persisted (e.g. bulk_create silently
# failed), the anti-join would classify EVERY resource as ephemeral
# and zero their counts. The gate must skip and preserve the data.
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
Scan.objects.filter(id=scan1.id).update(unique_resource_count=10)
Resource.objects.filter(id=resource1.id).update(failed_findings_count=3)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "summaries missing"
resource1.refresh_from_db()
resource2.refresh_from_db()
assert resource1.failed_findings_count == 3
assert resource2.failed_findings_count == 5
def test_ignores_sibling_scan_with_null_completed_at(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
# Postgres orders NULL first in DESC; a sibling COMPLETED scan with a
# missing completed_at must not be treated as the latest scan and
# cause us to incorrectly skip the reset.
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
provider, *_ = providers_fixture
resource1, resource2, _ = resources_fixture
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
self._make_scan_summary(tenant.id, scan1.id, resource1)
Scan.objects.create(
name="Ghost Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
started_at=scan1.completed_at,
completed_at=None,
)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 1
resource2.refresh_from_db()
assert resource2.failed_findings_count == 0
def test_batches_updates_when_many_ephemeral_resources(
self, tenants_fixture, scans_fixture, resources_fixture
):
# Forces multiple batches to confirm the chunked UPDATE path executes
# cleanly and the count is the sum across batches.
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
Resource.objects.filter(id=resource1.id).update(failed_findings_count=2)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=4)
# No ResourceScanSummary -> both resource1 and resource2 are ephemeral.
# Force a 1-row batch via the shared findings batch size knob.
with patch("tasks.jobs.scan.DJANGO_FINDINGS_BATCH_SIZE", 1):
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 2
resource1.refresh_from_db()
resource2.refresh_from_db()
assert resource1.failed_findings_count == 0
assert resource2.failed_findings_count == 0
+66
View File
@@ -842,6 +842,72 @@ class TestScanCompleteTasks:
# Attack Paths task should be skipped when provider cannot run it
mock_attack_paths_task.assert_not_called()
@pytest.mark.parametrize(
"row_pre_existing",
[True, False],
ids=["row-pre-existing", "row-missing-fallback"],
)
@patch("tasks.tasks.aggregate_attack_surface_task.apply_async")
@patch("tasks.tasks.chain")
@patch("tasks.tasks.create_compliance_requirements_task.si")
@patch("tasks.tasks.update_provider_compliance_scores_task.si")
@patch("tasks.tasks.perform_scan_summary_task.si")
@patch("tasks.tasks.generate_outputs_task.si")
@patch("tasks.tasks.generate_compliance_reports_task.si")
@patch("tasks.tasks.check_integrations_task.si")
@patch("tasks.tasks.attack_paths_db_utils.set_attack_paths_scan_task_id")
@patch("tasks.tasks.attack_paths_db_utils.create_attack_paths_scan")
@patch("tasks.tasks.attack_paths_db_utils.retrieve_attack_paths_scan")
@patch("tasks.tasks.perform_attack_paths_scan_task.apply_async")
@patch("tasks.tasks.can_provider_run_attack_paths_scan", return_value=True)
def test_scan_complete_dispatches_attack_paths_scan(
self,
_mock_can_run_attack_paths,
mock_attack_paths_task,
mock_retrieve,
mock_create,
mock_set_task_id,
mock_check_integrations_task,
mock_compliance_reports_task,
mock_outputs_task,
mock_scan_summary_task,
mock_update_compliance_scores_task,
mock_compliance_requirements_task,
mock_chain,
mock_attack_surface_task,
row_pre_existing,
):
"""When a provider can run Attack Paths, dispatch must:
1. Reuse the existing row or create one if missing.
2. Call apply_async on the Attack Paths task.
3. Persist the returned Celery task id on the row.
"""
existing_row = MagicMock(id="ap-scan-id")
if row_pre_existing:
mock_retrieve.return_value = existing_row
else:
mock_retrieve.return_value = None
mock_create.return_value = existing_row
async_result = MagicMock(task_id="celery-task-id")
mock_attack_paths_task.return_value = async_result
_perform_scan_complete_tasks("tenant-id", "scan-id", "provider-id")
mock_retrieve.assert_called_once_with("tenant-id", "scan-id")
if row_pre_existing:
mock_create.assert_not_called()
else:
mock_create.assert_called_once_with("tenant-id", "scan-id", "provider-id")
mock_attack_paths_task.assert_called_once_with(
kwargs={"tenant_id": "tenant-id", "scan_id": "scan-id"}
)
mock_set_task_id.assert_called_once_with(
"tenant-id", "ap-scan-id", "celery-task-id"
)
class TestAttackPathsTasks:
@staticmethod
-316
View File
@@ -1,316 +0,0 @@
# Prowler ASPM Agent Manifest
# ============================================================
# Describe each AI agent deployment and its security posture.
# Run: prowler aspm --manifest-path aspm-manifest.yaml
#
# All fields under each section correspond to ASPM checks.
# Set values that accurately reflect your deployment so that
# Prowler can generate an accurate posture report.
# ============================================================
agents:
# ----------------------------------------------------------
# Example 1: A well-configured production LLM agent
# ----------------------------------------------------------
- id: agent-docrecommender-prod
name: agent-docrecommender-prod
environment: prod
cloud_provider: aws
region: us-east-1
# 1.1 Identity & Authentication (ASPM-001 to ASPM-012)
identity:
type: iam_role
arn: arn:aws:iam::123456789012:role/agent-docrecommender-prod
tags:
agent: "true"
owner: team-ai
env: prod
purpose: document-recommendation
criticality: high
created_at: "2025-01-15"
last_used: "2026-03-20"
uses_oidc: true
uses_static_credentials: false
credential_age_days: 30
rotation_policy_days: 30
naming_compliant: true
has_owner_tag: true
cross_cloud_registered: true
jwt_validation_enabled: true
session_duration_seconds: 3600
has_deprovisioning_record: true
oauth_scope_minimal: true
unused_secondary_credentials: false
# 1.2 Permissions & Least Privilege (ASPM-013 to ASPM-025)
permissions:
has_wildcard_actions: false
has_wildcard_resources: false
has_admin_policy: false
has_inline_policies: false
can_escalate_privileges: false
cross_account_access: false
cross_account_accounts: 0
has_permission_boundary: true
shares_role_with_human: false
session_duration_seconds: 3600
permissions_last_reviewed_days: 45
data_domains_accessed:
- s3
has_condition_on_sensitive_actions: true
all_resources_tagged: true
permission_changes_approved: true
# 1.3 Credential Management (ASPM-026 to ASPM-036)
credentials:
has_hardcoded_secrets: false
credentials_in_logs: false
uses_secrets_manager: true
api_key_in_vcs: false
rotation_interval_days: 30
secrets_in_iac: false
database_uses_proxy: true
third_party_keys_managed: true
credential_access_audit_trail: true
credentials_scoped: true
credentials_per_environment: true
# 1.4 Network & Communication Security (ASPM-037 to ASPM-046)
network:
uses_https_only: true
mtls_enforced: true
api_calls_authenticated: true
has_rate_limiting: true
has_egress_filtering: true
network_isolated: true
api_gateway_enforced: true
validates_tls_certificates: true
network_calls_logged: true
uses_dnssec: true
validates_webhooks: true
# 1.5 Data Access & Privacy (ASPM-047 to ASPM-057)
data_access:
accesses_pii: false
has_dlp_controls: true
data_encrypted_at_rest: true
data_encrypted_in_transit: true
cross_boundary_data_flows_approved: true
training_data_integrity_verified: true
data_retention_policy_days: 90
database_query_audit_enabled: true
object_storage_access_logged: true
llm_context_sanitized: true
has_model_card: true
output_validated_for_sensitive_data: true
supports_data_subject_rights: true
# 1.6 Runtime & Sandbox Security (ASPM-058 to ASPM-067)
runtime:
runs_as_root: false
privileged_container: false
has_seccomp_profile: true
has_apparmor_selinux: true
has_resource_limits: true
image_scanned_for_cves: true
has_runtime_monitoring: true
execution_environment_versioned: true
secrets_cleared_from_memory: true
has_execution_timeout: true
behavior_deterministic: true
dependencies_integrity_checked: true
uses_platform_security_controls: true
# 1.7 Supply Chain Security (ASPM-068 to ASPM-076)
supply_chain:
framework_cves_scanned: true
llm_model_provenance_verified: true
plugins_security_reviewed: true
dependencies_version_pinned: true
artifacts_signed: true
cicd_has_security_gates: true
licenses_compliant: true
model_update_cadence_days: 30
dependency_checksums_verified: true
# 1.8 Observability & Monitoring (ASPM-077 to ASPM-086)
observability:
execution_logs_complete: true
anomaly_detection_enabled: true
prompt_injection_monitoring: true
audit_logs_immutable: true
metrics_exported: true
security_event_alerting: true
distributed_tracing_enabled: true
centralized_dashboard: true
configuration_drift_tracked: true
performance_baseline_defined: true
# 1.9 Compliance & Governance (ASPM-087 to ASPM-095)
compliance:
owasp_llm_top10_assessed: true
eu_ai_act_controls_present: true
nist_ai_rmf_assessed: true
access_control_policy_enforced: true
dpia_completed: true
regulatory_requirements_mapped: true
incident_response_plan_exists: true
third_party_vendors_assessed: true
user_consent_and_disclosure: true
# 1.10 Attack Path Analysis (ASPM-096 to ASPM-101)
attack_paths:
cross_cloud_escalation_possible: false
tool_abuse_escalation_possible: false
sensitive_data_enables_downstream_compromise: false
lateral_movement_via_shared_infra: false
compromise_enables_full_account_takeover: false
llm_output_used_in_code_execution: false
# ----------------------------------------------------------
# Example 2: A poorly-configured staging agent (many FAILs)
# ----------------------------------------------------------
- id: agent-dataprocessor-staging
name: agent-dataprocessor-staging
environment: staging
cloud_provider: aws
region: eu-west-1
identity:
type: iam_role
arn: arn:aws:iam::123456789012:role/DataProcessorRole
tags: {} # FAIL: ASPM-001 (no tags)
created_at: "2023-06-01"
last_used: "2025-12-01"
uses_oidc: false # FAIL: ASPM-011
uses_static_credentials: true # FAIL: ASPM-011
credential_age_days: 660 # FAIL: ASPM-007 (>365 days)
rotation_policy_days: null # FAIL: ASPM-004
naming_compliant: false # FAIL: ASPM-002 (generic name)
has_owner_tag: false # FAIL: ASPM-009
cross_cloud_registered: false # FAIL: ASPM-003
jwt_validation_enabled: false # FAIL: ASPM-006
session_duration_seconds: 43200 # FAIL: ASPM-010 (12 hours)
has_deprovisioning_record: false # FAIL: ASPM-008
oauth_scope_minimal: false # FAIL: ASPM-005
unused_secondary_credentials: true # FAIL: ASPM-012
permissions:
has_wildcard_actions: true # FAIL: ASPM-013, ASPM-016
has_wildcard_resources: true # FAIL: ASPM-016
has_admin_policy: true # FAIL: ASPM-013
has_inline_policies: true # FAIL: ASPM-014
can_escalate_privileges: true # FAIL: ASPM-015
cross_account_access: true
cross_account_accounts: 5 # FAIL: ASPM-017 (>3)
has_permission_boundary: false # FAIL: ASPM-022
shares_role_with_human: true # FAIL: ASPM-023
session_duration_seconds: null # FAIL: ASPM-024
permissions_last_reviewed_days: null # FAIL: ASPM-018
data_domains_accessed:
- s3
- rds
- redshift # FAIL: ASPM-020 (>1 domain)
has_condition_on_sensitive_actions: false # FAIL: ASPM-019
all_resources_tagged: false # FAIL: ASPM-021
permission_changes_approved: false # FAIL: ASPM-025
credentials:
has_hardcoded_secrets: true # FAIL: ASPM-026
credentials_in_logs: true # FAIL: ASPM-027
uses_secrets_manager: false # FAIL: ASPM-028
api_key_in_vcs: true # FAIL: ASPM-029
rotation_interval_days: null # FAIL: ASPM-030
secrets_in_iac: true # FAIL: ASPM-031
database_uses_proxy: false # FAIL: ASPM-032
third_party_keys_managed: false # FAIL: ASPM-033
credential_access_audit_trail: false # FAIL: ASPM-034
credentials_scoped: false # FAIL: ASPM-035
credentials_per_environment: false # FAIL: ASPM-036
network:
uses_https_only: false # FAIL: ASPM-037
mtls_enforced: false # FAIL: ASPM-037
api_calls_authenticated: false # FAIL: ASPM-038
has_rate_limiting: false # FAIL: ASPM-039
has_egress_filtering: false # FAIL: ASPM-040
network_isolated: false # FAIL: ASPM-041
api_gateway_enforced: false # FAIL: ASPM-042
validates_tls_certificates: false # FAIL: ASPM-043
network_calls_logged: false # FAIL: ASPM-044
uses_dnssec: false # FAIL: ASPM-045
validates_webhooks: false # FAIL: ASPM-046
data_access:
accesses_pii: true
has_dlp_controls: false # FAIL: ASPM-047
data_encrypted_at_rest: false # FAIL: ASPM-048
data_encrypted_in_transit: false # FAIL: ASPM-048
cross_boundary_data_flows_approved: false # FAIL: ASPM-049
training_data_integrity_verified: false # FAIL: ASPM-050
data_retention_policy_days: null # FAIL: ASPM-051
database_query_audit_enabled: false # FAIL: ASPM-052
object_storage_access_logged: false # FAIL: ASPM-053
llm_context_sanitized: false # FAIL: ASPM-054
has_model_card: false # FAIL: ASPM-055
output_validated_for_sensitive_data: false # FAIL: ASPM-056
supports_data_subject_rights: false # FAIL: ASPM-057
runtime:
runs_as_root: true # FAIL: ASPM-058
privileged_container: true # FAIL: ASPM-058
has_seccomp_profile: false # FAIL: ASPM-058
has_apparmor_selinux: false # FAIL: ASPM-058
has_resource_limits: false # FAIL: ASPM-059
image_scanned_for_cves: false # FAIL: ASPM-060
has_runtime_monitoring: false # FAIL: ASPM-061
execution_environment_versioned: false # FAIL: ASPM-062
secrets_cleared_from_memory: false # FAIL: ASPM-063
has_execution_timeout: false # FAIL: ASPM-064
behavior_deterministic: false # FAIL: ASPM-065
dependencies_integrity_checked: false # FAIL: ASPM-066
uses_platform_security_controls: false # FAIL: ASPM-067
supply_chain:
framework_cves_scanned: false # FAIL: ASPM-068
llm_model_provenance_verified: false # FAIL: ASPM-069
plugins_security_reviewed: false # FAIL: ASPM-070
dependencies_version_pinned: false # FAIL: ASPM-071
artifacts_signed: false # FAIL: ASPM-072
cicd_has_security_gates: false # FAIL: ASPM-073
licenses_compliant: false # FAIL: ASPM-074
model_update_cadence_days: null # FAIL: ASPM-075
dependency_checksums_verified: false # FAIL: ASPM-076
observability:
execution_logs_complete: false # FAIL: ASPM-077
anomaly_detection_enabled: false # FAIL: ASPM-078
prompt_injection_monitoring: false # FAIL: ASPM-079
audit_logs_immutable: false # FAIL: ASPM-080
metrics_exported: false # FAIL: ASPM-081
security_event_alerting: false # FAIL: ASPM-082
distributed_tracing_enabled: false # FAIL: ASPM-083
centralized_dashboard: false # FAIL: ASPM-084
configuration_drift_tracked: false # FAIL: ASPM-085
performance_baseline_defined: false # FAIL: ASPM-086
compliance:
owasp_llm_top10_assessed: false # FAIL: ASPM-087
eu_ai_act_controls_present: false # FAIL: ASPM-088
nist_ai_rmf_assessed: false # FAIL: ASPM-089
access_control_policy_enforced: false # FAIL: ASPM-090
dpia_completed: false # FAIL: ASPM-091
regulatory_requirements_mapped: false # FAIL: ASPM-092
incident_response_plan_exists: false # FAIL: ASPM-093
third_party_vendors_assessed: false # FAIL: ASPM-094
user_consent_and_disclosure: false # FAIL: ASPM-095
attack_paths:
cross_cloud_escalation_possible: true # FAIL: ASPM-096
tool_abuse_escalation_possible: true # FAIL: ASPM-097
sensitive_data_enables_downstream_compromise: true # FAIL: ASPM-098
lateral_movement_via_shared_infra: true # FAIL: ASPM-099
compromise_enables_full_account_takeover: true # FAIL: ASPM-100
llm_output_used_in_code_execution: true # FAIL: ASPM-101
@@ -121,8 +121,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.24.0"
PROWLER_API_VERSION="5.24.0"
PROWLER_UI_VERSION="5.25.1"
PROWLER_API_VERSION="5.25.1"
```
<Note>
@@ -227,6 +227,7 @@ Assign administrative permissions by selecting from the following options:
| Manage Integrations | All | Add or modify the Prowler Integrations. |
| Manage Ingestions | Prowler Cloud | Allow or deny the ability to submit findings ingestion batches via the API. |
| Manage Billing | Prowler Cloud | Access and manage billing settings and subscription information. |
| Manage Alerts | Prowler Cloud | Create, edit, and delete alert rules and recipients. |
<Note>
The **Scope** column indicates where each permission applies. **All** means the permission is available in both Prowler Cloud and Self-Managed deployments. **Prowler Cloud** indicates permissions that are specific to [Prowler Cloud](https://cloud.prowler.com/sign-in).
@@ -241,3 +242,5 @@ The following permissions are available exclusively in **Prowler Cloud**:
**Manage Ingestions:** Submit and manage findings ingestion jobs via the API. Required to upload OCSF scan results using the `--push-to-cloud` CLI flag or the ingestion endpoints. See [Import Findings](/user-guide/tutorials/prowler-app-import-findings) for details.
**Manage Billing:** Access and manage billing settings, subscription plans, and payment methods.
**Manage Alerts:** Create, edit, and delete alert rules and recipients used to deliver scan-result digests via email.
+42 -7
View File
@@ -2,21 +2,60 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.25.0] (Prowler UNRELEASED)
## [5.26.0] (Prowler UNRELEASED)
### 🚀 Added
- GitHub Actions service for scanning workflow security issues using zizmor [(#10607)](https://github.com/prowler-cloud/prowler/pull/10607)
- `bedrock_guardrails_configured` check for AWS provider [(#10844)](https://github.com/prowler-cloud/prowler/pull/10844)
- Universal compliance pipeline integrated into the CLI: `--list-compliance` and `--list-compliance-requirements` show universal frameworks, and CSV plus OCSF outputs are generated for any framework declaring a `TableConfig` [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
- ASD Essential Eight Maturity Model compliance framework for AWS (Maturity Level One, Nov 2023) [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
### 🔄 Changed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Azure Network Watcher flow log checks now require workspace-backed Traffic Analytics for `network_flow_log_captured_sent` and align metadata with VNet-compatible flow log guidance [(#10645)](https://github.com/prowler-cloud/prowler/pull/10645)
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs
- AWS CodeBuild service now batches `BatchGetProjects` and `BatchGetBuilds` calls per region (up to 100 items per call) to reduce API call volume and prevent throttling-induced false positives in `codebuild_project_not_publicly_accessible` [(#10639)](https://github.com/prowler-cloud/prowler/pull/10639)
- `display_compliance_table` dispatch switched from substring `in` checks to `startswith` to prevent false matches between similarly named frameworks (e.g. `cisa` vs `cis`) [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
### 🐞 Fixed
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
- AWS `boto` user agent extra is now applied to every client [(#10944)](https://github.com/prowler-cloud/prowler/pull/10944)
### 🔐 Security
- Parser-mismatch SSRF in image provider registry auth where crafted bearer-token realms and pagination links could force requests to internal addresses and leak credentials cross-origin [(#10945)](https://github.com/prowler-cloud/prowler/pull/10945)
---
## [5.25.1] (Prowler v5.25.1)
### 🐞 Fixed
- `KeyError` when generating compliance outputs after the CLI scan [#10919](https://github.com/prowler-cloud/prowler/pull/10919)
- Kubernetes OCSF `provider_uid` now uses the cluster name in in-cluster mode (so `--cluster-name` is correctly reflected in findings) and keeps the kubeconfig context in kubeconfig mode [(#10483)](https://github.com/prowler-cloud/prowler/pull/10483)
---
## [5.25.0] (Prowler v5.25.0)
### 🚀 Added
- `--repo-list-file` CLI flag for GitHub provider to load repositories from a file [(#10501)](https://github.com/prowler-cloud/prowler/pull/10501)
- SARIF output format for the IaC provider, enabling GitHub Code Scanning integration via `--output-formats sarif` [(#10626)](https://github.com/prowler-cloud/prowler/pull/10626)
- `repository_default_branch_dismisses_stale_reviews` check for GitHub provider to ensure stale pull request approvals are dismissed when new commits are pushed [(#10569)](https://github.com/prowler-cloud/prowler/pull/10569)
- Official Prowler GitHub Action (`prowler-cloud/prowler@5.25`) for running scans in GitHub workflows with optional `--push-to-cloud` and SARIF upload to GitHub Code Scanning [(#10872)](https://github.com/prowler-cloud/prowler/pull/10872)
- New `aspm` provider for Agent Security Posture Management with 101 checks across 10 categories covering identity, permissions, credentials, network, data access, runtime, supply chain, observability, compliance, and attack path analysis for AI agent deployments [(#10454)](https://github.com/prowler-cloud/prowler/pull/10454)
- GitHub Actions service for scanning workflow security issues using zizmor [(#10607)](https://github.com/prowler-cloud/prowler/pull/10607)
- `secretsmanager_has_restrictive_resource_policy` check for AWS provider [(#6985)](https://github.com/prowler-cloud/prowler/pull/6985)
### 🐞 Fixed
- Alibaba Cloud CS service SDK compatibility, harden other services and improve documentation [(#10871)](https://github.com/prowler-cloud/prowler/pull/10871)
- AWS Organizations metadata retrieval for delegated administrator scans by using the assumed role session instead of the pre-assume credentials [(#10894)](https://github.com/prowler-cloud/prowler/pull/10894)
- `admincenter_groups_not_public_visibility` check for M365 provider evaluating Security and Distribution groups, now restricted to Microsoft 365 (Unified) groups per CIS M365 Foundations 1.2.1 [(#10899)](https://github.com/prowler-cloud/prowler/pull/10899)
- Google Workspace check reports now store the actual domain or account resource subject instead of `provider.identity` [(#10901)](https://github.com/prowler-cloud/prowler/pull/10901)
- `entra_users_mfa_capable` evaluating disabled guest accounts; CIS 5.2.3.4 only targets enabled member users [(#10785)](https://github.com/prowler-cloud/prowler/pull/10785)
---
@@ -31,10 +70,6 @@ All notable changes to the **Prowler SDK** are documented in this file.
## [5.24.1] (Prowler v5.24.1)
### 🚀 Added
- `--repo-list-file` CLI flag for GitHub provider to load repositories from a file [(#10501)](https://github.com/prowler-cloud/prowler/pull/10501)
### 🔄 Changed
- `msgraph-sdk` from 1.23.0 to 1.55.0 and `azure-mgmt-resource` from 23.3.0 to 24.0.0, removing `marshmallow` as is a transitively dev dependency [(#10733)](https://github.com/prowler-cloud/prowler/pull/10733)
+56 -7
View File
@@ -45,7 +45,10 @@ from prowler.lib.check.check import (
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.compliance_models import (
Compliance,
get_bulk_compliance_frameworks_universal,
)
from prowler.lib.check.custom_checks_metadata import (
parse_custom_checks_metadata_file,
update_checks_metadata,
@@ -75,7 +78,10 @@ from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
GoogleWorkspaceCISASCuBA,
)
from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.compliance.compliance import (
display_compliance_table,
process_universal_compliance_frameworks,
)
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
from prowler.lib.outputs.compliance.csa.csa_azure import AzureCSA
@@ -84,6 +90,9 @@ from prowler.lib.outputs.compliance.csa.csa_oraclecloud import OracleCloudCSA
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.essential_eight.essential_eight_aws import (
EssentialEightAWS,
)
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_azure import AzureISO27001
@@ -235,6 +244,8 @@ def prowler():
# Load compliance frameworks
logger.debug("Loading compliance frameworks from .json files")
universal_frameworks = {}
# Skip compliance frameworks for external-tool providers
if provider not in EXTERNAL_TOOL_PROVIDERS:
bulk_compliance_frameworks = Compliance.get_bulk(provider)
@@ -242,6 +253,8 @@ def prowler():
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
# Load universal compliance frameworks for new rendering pipeline
universal_frameworks = get_bulk_compliance_frameworks_universal(provider)
# Update checks metadata if the --custom-checks-metadata-file is present
custom_checks_metadata = None
@@ -254,12 +267,12 @@ def prowler():
)
if args.list_compliance:
print_compliance_frameworks(bulk_compliance_frameworks)
all_frameworks = {**bulk_compliance_frameworks, **universal_frameworks}
print_compliance_frameworks(all_frameworks)
sys.exit()
if args.list_compliance_requirements:
print_compliance_requirements(
bulk_compliance_frameworks, args.list_compliance_requirements
)
all_frameworks = {**bulk_compliance_frameworks, **universal_frameworks}
print_compliance_requirements(all_frameworks, args.list_compliance_requirements)
sys.exit()
# Load checks to execute
@@ -276,6 +289,7 @@ def prowler():
provider=provider,
list_checks=getattr(args, "list_checks", False)
or getattr(args, "list_checks_json", False),
universal_frameworks=universal_frameworks,
)
# if --list-checks-json, dump a json file and exit
@@ -624,9 +638,29 @@ def prowler():
)
# Compliance Frameworks
# Source the framework listing from the union of `bulk_compliance_frameworks`
# and `universal_frameworks` so universal-only frameworks (e.g.
# `prowler/compliance/csa_ccm_4.0.json`) — which `Compliance.get_bulk(provider)`
# does not load — still reach `process_universal_compliance_frameworks` below.
# The provider-specific block subtracts the names handled by the universal
# processor so the legacy per-provider handlers only see frameworks that the
# bulk loader actually resolved.
input_compliance_frameworks = set(output_options.output_modes).intersection(
get_available_compliance_frameworks(provider)
set(bulk_compliance_frameworks.keys()) | set(universal_frameworks.keys())
)
# ── Universal compliance frameworks (provider-agnostic) ──
universal_processed = process_universal_compliance_frameworks(
input_compliance_frameworks=input_compliance_frameworks,
universal_frameworks=universal_frameworks,
finding_outputs=finding_outputs,
output_directory=output_options.output_directory,
output_filename=output_options.output_filename,
provider=provider,
generated_outputs=generated_outputs,
)
input_compliance_frameworks -= universal_processed
if provider == "aws":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("cis_"):
@@ -642,6 +676,18 @@ def prowler():
)
generated_outputs["compliance"].append(cis)
cis.batch_write_data_to_file()
elif compliance_name.startswith("essential_eight"):
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
essential_eight = EssentialEightAWS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(essential_eight)
essential_eight.batch_write_data_to_file()
elif compliance_name == "mitre_attack_aws":
# Generate MITRE ATT&CK Finding Object
filename = (
@@ -1396,6 +1442,9 @@ def prowler():
output_options.output_filename,
output_options.output_directory,
compliance_overview,
universal_frameworks=universal_frameworks,
provider=provider,
output_formats=args.output_formats,
)
if compliance_overview:
print(
+8 -8
View File
@@ -6426,9 +6426,9 @@
}
],
"Checks": [
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
"bedrock_guardrail_sensitive_information_filter_enabled"
]
},
{
@@ -6485,9 +6485,9 @@
}
],
"Checks": [
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
"bedrock_guardrail_sensitive_information_filter_enabled"
]
},
{
@@ -6546,8 +6546,8 @@
}
],
"Checks": [
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled"
]
},
{
@@ -6606,8 +6606,8 @@
}
],
"Checks": [
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled"
]
},
{
File diff suppressed because it is too large Load Diff
@@ -2894,6 +2894,7 @@
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"cloudformation_stack_outputs_find_secrets",
@@ -2898,6 +2898,7 @@
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"cloudformation_stack_outputs_find_secrets",
+6 -6
View File
@@ -2276,9 +2276,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting thegeneration of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "From Azure Portal 1. Navigate to Network Watcher. 2. Select NSG flow logs. 3. Select + Create. 4. Select the desired Subscription. 5. Select + Select NSG. 6. Select a network security group. 7. Click Confirm selection. 8. Select or create a new Storage Account. 9. Input the retention in days to retain the log. 10. Click Next. 11. Under Configuration, select Version 2. 12. If rich analytics are required, select Enable Traffic Analytics, a processing interval, and a Log Analytics Workspace. 13. Select Next. 14. Optionally add Tags. 15. Select Review + create. 16. Select Create. Warning The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "From Azure Portal 1. Navigate to Network Watcher. 2. Select NSG flow logs 3. For each log you wish to audit select it from this view.",
"AdditionalInformation": "",
"RemediationProcedure": "From Azure Portal Existing NSG flow logs can still be reviewed under Network Watcher > Flow logs. If you already have NSG flow logs configured, ensure they remain enabled and that Traffic Analytics sends data to a Log Analytics Workspace until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create Virtual network flow logs instead: 1. Navigate to Network Watcher. 2. Select Flow logs. 3. Select + Create. 4. Select the desired Subscription. 5. For Flow log type, select Virtual network. 6. Select + Select target resource. 7. Select a virtual network. 8. Click Confirm selection. 9. Select or create a new Storage Account. 10. Input the retention in days to retain the log. 11. Click Next. 12. Under Analytics, select Version 2, enable Traffic Analytics, and select a Log Analytics Workspace. 13. Select Next. 14. Optionally add Tags. 15. Select Review + create. 16. Select Create.",
"AuditProcedure": "From Azure Portal 1. Navigate to Network Watcher. 2. Select Flow logs. 3. Review existing Network security group flow logs, if any remain, to ensure they are enabled and configured to send logs to a Log Analytics Workspace. 4. Review Virtual network flow logs for new or migrated coverage.",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation"
}
@@ -2702,9 +2702,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "From Azure Portal 1. Go to Network Watcher 2. Select NSG flow logs blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure Status is set to On 5. Ensure Retention (days) setting greater than 90 days 6. Select your storage account in the Storage account field 7. Select Save From Azure CLI Enable the NSG flow logs and set the Retention (days) to greater than or equal to 90 days. az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 -- storage-account <NameorID of the storage account to save flow logs>",
"AuditProcedure": "From Azure Portal 1. Go to Network Watcher 2. Select NSG flow logs blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure Status is set to On 5. Ensure Retention (days) setting greater than 90 days From Azure CLI az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy' Ensure that enabled is set to true and days is set to greater then or equal to 90.",
"AdditionalInformation": "",
"RemediationProcedure": "From Azure Portal Existing NSG flow logs can still be reviewed under Network Watcher > Flow logs. If you already have NSG flow logs configured, ensure Status is set to On and Retention (days) is set to 0, 90, or a number greater than 90 until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure Virtual network flow logs instead and set Retention days to 0, 90, or a number greater than 90. From Azure CLI Update an existing flow log retention policy with az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days>.",
"AuditProcedure": "From Azure Portal 1. Go to Network Watcher. 2. Select Flow logs. 3. Review existing Network security group flow logs, if any remain, and ensure Status is set to On and Retention (days) is set to 0, 90, or a number greater than 90. 4. Review Virtual network flow logs for new or migrated coverage. From Azure CLI az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId] Ensure each relevant flow log has retention days set to 0, 90, or a number greater than 90.",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"DefaultValue": "By default, Network Security Group Flow Logs are disabled.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-logging-threat-detection#lt-6-configure-log-storage-retention"
}
+6 -6
View File
@@ -2241,9 +2241,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**From Azure Portal** 1. Navigate to `Network Watcher`. 1. Select `NSG flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. Select `+ Select NSG`. 1. Select a network security group. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. Input the retention in days to retain the log. 1. Click `Next`. 1. Under `Configuration`, select `Version 2`. 1. If rich analytics are required, select `Enable Traffic Analytics`, a processing interval, and a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`. ***Warning*** The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**From Azure Portal** 1. Navigate to `Network Watcher`. 1. Select `NSG flow logs` 1. For each log you wish to audit select it from this view. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"RemediationProcedure": "**From Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`. 1. Select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Virtual network`. 1. Select `+ Select target resource`. 1. Select `Virtual network`. 1. Select a virtual network. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. Input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`.",
"AuditProcedure": "**From Azure Portal** 1. Navigate to `Network Watcher`. 1. Select `Flow logs`. 1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`. 1. Review `Virtual network` flow logs for new or migrated coverage. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation"
}
@@ -2627,9 +2627,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "**From Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` 6. Select your storage account in the `Storage account` field 7. Select `Save` **From Azure CLI** Enable the `NSG flow logs` and set the Retention (days) to greater than or equal to 90 days. ``` az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 --storage-account <NameorID of the storage account to save flow logs> ```",
"AuditProcedure": "**From Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` **From Azure CLI** ``` az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy' ``` Ensure that `enabled` is set to `true` and `days` is set to `greater then or equal to 90`. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "",
"RemediationProcedure": "**From Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure `Virtual network` flow logs instead and set `Retention days` to `0`, `90`, or a number greater than `90`. **From Azure CLI** Update an existing flow log retention policy with: ``` az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days> ```",
"AuditProcedure": "**From Azure Portal** 1. Go to `Network Watcher`. 1. Select `Flow logs`. 1. Review existing `Network security group` flow logs, if any remain, and ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90`. 1. Review `Virtual network` flow logs for new or migrated coverage. **From Azure CLI** ``` az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId] ``` Ensure each relevant flow log has retention days set to `0`, `90`, or a number greater than `90`. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"DefaultValue": "By default, Network Security Group Flow Logs are `disabled`.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-6-configure-log-storage-retention"
}
+6 -6
View File
@@ -2548,9 +2548,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace.This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**Remediate from Azure Portal**1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Select `+ Create`.1. Select the desired Subscription.1. For `Flow log type`, select `Network security group`.1. Select `+ Select target resource`.1. Select `Network security group`.1. Select a network security group.1. Click `Confirm selection`.1. Select or create a new Storage Account.1. If using a v2 storage account, input the retention in days to retain the log.1. Click `Next`.1. Under `Analytics`, for `Flow log version`, select `Version 2`.1. Check the box next to `Enable traffic analytics`.1. Select a processing interval.1. Select a `Log Analytics Workspace`.1. Select `Next`.1. Optionally add Tags.1. Select `Review + create`.1. Select `Create`.***Warning***The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**Audit from Azure Portal**1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Click `Add filter`.1. From the `Filter` drop-down, select `Flow log type`.1. From the `Value` drop-down, check `Network security group` only.1. Click `Apply`.1. Ensure that at least one network security group flow log is listed and is configured to send logs to a `Log Analytics Workspace`.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state'- **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group'- **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Select `+ Create`.1. Select the desired Subscription.1. For `Flow log type`, select `Virtual network`.1. Select `+ Select target resource`.1. Select `Virtual network`.1. Select a virtual network.1. Click `Confirm selection`.1. Select or create a new Storage Account.1. If using a v2 storage account, input the retention in days to retain the log.1. Click `Next`.1. Under `Analytics`, for `Flow log version`, select `Version 2`.1. Check the box next to `Enable traffic analytics`.1. Select a processing interval.1. Select a `Log Analytics Workspace`.1. Select `Next`.1. Optionally add Tags.1. Select `Review + create`.1. Select `Create`.",
"AuditProcedure": "**Audit from Azure Portal**1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Click `Add filter`.1. From the `Filter` drop-down, select `Flow log type`.1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`.1. Review `Virtual network` flow logs for new or migrated coverage.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state'- **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group'- **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation"
}
@@ -2934,9 +2934,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "**Remediate from Azure Portal**1. Go to `Network Watcher`2. Select `NSG flow logs` blade in the Logs section3. Select each Network Security Group from the list4. Ensure `Status` is set to `On`5. Ensure `Retention (days)` setting `greater than 90 days`6. Select your storage account in the `Storage account` field7. Select `Save`**Remediate from Azure CLI**Enable the `NSG flow logs` and set the Retention (days) to greater than or equal to 90 days.```az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 --storage-account <NameorID of the storage account to save flow logs>```",
"AuditProcedure": "**Audit from Azure Portal**1. Go to `Network Watcher`2. Select `NSG flow logs` blade in the Logs section3. Select each Network Security Group from the list4. Ensure `Status` is set to `On`5. Ensure `Retention (days)` setting `greater than 90 days`**Audit from Azure CLI**```az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy'```Ensure that `enabled` is set to `true` and `days` is set to `greater then or equal to 90`.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure `Virtual network` flow logs instead and set `Retention days` to `0`, `90`, or a number greater than `90`.**Remediate from Azure CLI**Update an existing flow log retention policy with:```az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days>```",
"AuditProcedure": "**Audit from Azure Portal**1. Go to `Network Watcher`.1. Select `Flow logs`.1. Review existing `Network security group` flow logs, if any remain, and ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90`.1. Review `Virtual network` flow logs for new or migrated coverage.**Audit from Azure CLI**```az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId]```Ensure each relevant flow log has retention days set to `0`, `90`, or a number greater than `90`.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"DefaultValue": "By default, Network Security Group Flow Logs are `disabled`.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-6-configure-log-storage-retention"
}
+6 -6
View File
@@ -1302,9 +1302,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace. **Retirement Notice** On September 30, 2027, network security group (NSG) flow logs will be retired. Starting June 30, 2025, it will no longer be possible to create new NSG flow logs. Azure recommends migrating to virtual network flow logs. Review https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement for more information. For virtual network flow logs, consider applying the recommendation `Ensure that virtual network flow logs are captured and sent to Log Analytics` in this section.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**Remediate from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Network security group`. 1. Select `+ Select target resource`. 1. Select `Network security group`. 1. Select a network security group. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`. ***Warning*** The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. From the `Value` drop-down, check `Network security group` only. 1. Click `Apply`. 1. Ensure that at least one network security group flow log is listed and is configured to send logs to a `Log Analytics Workspace`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Virtual network`. 1. Select `+ Select target resource`. 1. Select `Virtual network`. 1. Select a virtual network. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`. 1. Review `Virtual network` flow logs for new or migrated coverage. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies. For details, see the official announcement: https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics."
}
@@ -1789,9 +1789,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days. **Retirement Notice** On September 30, 2027, network security group (NSG) flow logs will be retired. Starting June 30, 2025, it will no longer be possible to create new NSG flow logs. Azure recommends migrating to virtual network flow logs. Review https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement for more information. For virtual network flow logs, consider applying the recommendation `Ensure that virtual network flow log retention days is set to greater than or equal to 90` in this section.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "**Remediate from Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` 6. Select your storage account in the `Storage account` field 7. Select `Save` **Remediate from Azure CLI** Enable the `NSG flow logs` and set the Retention (days) to greater than or equal to 90 days. ``` az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 --storage-account <NameorID of the storage account to save flow logs> ```",
"AuditProcedure": "**Audit from Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` **Audit from Azure CLI** ``` az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy' ``` Ensure that `enabled` is set to `true` and `days` is set to `greater then or equal to 90`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure `Virtual network` flow logs instead and set `Retention days` to `0`, `90`, or a number greater than `90`. **Remediate from Azure CLI** Update an existing flow log retention policy with: ``` az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days> ```",
"AuditProcedure": "**Audit from Azure Portal** 1. Go to `Network Watcher`. 1. Select `Flow logs`. 1. Review existing `Network security group` flow logs, if any remain, and ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90`. 1. Review `Virtual network` flow logs for new or migrated coverage. **Audit from Azure CLI** ``` az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId] ``` Ensure each relevant flow log has retention days set to `0`, `90`, or a number greater than `90`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies. For details, see the official announcement: https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-6-configure-log-storage-retention",
"DefaultValue": "By default, Network Security Group Flow Logs are `disabled`."
}
+3 -3
View File
@@ -1292,9 +1292,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace. **Retirement Notice** On September 30, 2027, network security group (NSG) flow logs will be retired. Starting June 30, 2025, it will no longer be possible to create new NSG flow logs. Azure recommends migrating to virtual network flow logs. Review https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement for more information. For virtual network flow logs, consider applying the recommendation `Ensure that virtual network flow logs are captured and sent to Log Analytics` in this section.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**Remediate from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Network security group`. 1. Select `+ Select target resource`. 1. Select `Network security group`. 1. Select a network security group. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`. ***Warning*** The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. From the `Value` drop-down, check `Network security group` only. 1. Click `Apply`. 1. Ensure that at least one network security group flow log is listed and is configured to send logs to a `Log Analytics Workspace`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Virtual network`. 1. Select `+ Select target resource`. 1. Select `Virtual network`. 1. Select a virtual network. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`. 1. Review `Virtual network` flow logs for new or migrated coverage. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies. For details, see the official announcement: https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics."
}
@@ -709,17 +709,17 @@
},
{
"Id": "3.1.8",
"Description": "Ensure that Network Security Group Flow logs are captured and sent to Log Analytics",
"Description": "Ensure that Network Watcher flow logs are captured and sent to Log Analytics",
"Checks": [
"network_flow_log_captured_sent"
],
"Attributes": [
{
"Title": "Network Security Group Flow logs are captured and sent to Log Analytics",
"Title": "Network Watcher flow logs are captured and sent to Log Analytics",
"Section": "3. Logging and Monitoring",
"SubSection": "3.1 Logging",
"AttributeDescription": "Ensure that network flow logs are collected and sent to a central Log Analytics workspace for monitoring and analysis.",
"AdditionalInformation": "Capturing network flow logs provides visibility into traffic patterns across your network, helping detect anomalies, potential lateral movement, and security threats. These logs integrate with Azure Monitor and Azure Sentinel, enabling advanced analytics and visualization for improved network security and incident response.",
"AttributeDescription": "Ensure that Network Watcher flow logs for supported targets, such as virtual networks and network security groups, are collected and sent to a central Log Analytics workspace for monitoring and analysis.",
"AdditionalInformation": "Capturing Network Watcher flow logs provides visibility into traffic patterns across your network, helping detect anomalies, potential lateral movement, and security threats. These logs integrate with Azure Monitor and Azure Sentinel, enabling advanced analytics and visualization for improved network security and incident response. For new deployments, prefer virtual network flow logs because NSG flow logs are on the retirement path.",
"LevelOfRisk": 4,
"Weight": 100
}
@@ -763,17 +763,17 @@
},
{
"Id": "3.2.1",
"Description": "Ensure that Network Security Group Flow Log retention period is 'greater than 90 days'",
"Description": "Ensure that Network Watcher flow log retention period is '0 or at least 90 days'",
"Checks": [
"network_flow_log_more_than_90_days"
],
"Attributes": [
{
"Title": "Network Security Group Flow Log retention period is 'greater than 90 days'",
"Title": "Network Watcher flow log retention period is '0 or at least 90 days'",
"Section": "3. Logging and Monitoring",
"SubSection": "3.2 Retention",
"AttributeDescription": "Enable Network Security Group (NSG) Flow Logs and configure the retention period to at least 90 days to capture and store IP traffic data for security monitoring and analysis.",
"AdditionalInformation": "NSG Flow Logs provide visibility into network traffic, helping detect anomalies, unauthorized access, and potential security breaches. Retaining logs for at least 90 days ensures that historical data is available for incident investigation, compliance, and forensic analysis, strengthening overall network security monitoring.",
"AttributeDescription": "Enable Network Watcher flow logs for supported targets, such as virtual networks and network security groups, and configure the retention period to 0 for unlimited retention or at least 90 days to capture and store IP traffic data for security monitoring and analysis.",
"AdditionalInformation": "Network Watcher flow logs provide visibility into network traffic, helping detect anomalies, unauthorized access, and potential security breaches. Retaining logs for 0 days (unlimited) or at least 90 days ensures that historical data is available for incident investigation, compliance, and forensic analysis, strengthening overall network security monitoring. For new deployments, prefer virtual network flow logs because NSG flow logs are on the retirement path.",
"LevelOfRisk": 3,
"Weight": 10
}
+6 -4
View File
@@ -48,7 +48,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.25.0"
prowler_version = "5.26.0"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
@@ -87,8 +87,8 @@ def get_available_compliance_frameworks(provider=None):
providers = [p.value for p in Provider]
if provider:
providers = [provider]
for provider in providers:
compliance_dir = f"{actual_directory}/../compliance/{provider}"
for current_provider in providers:
compliance_dir = f"{actual_directory}/../compliance/{current_provider}"
if not os.path.isdir(compliance_dir):
continue
with os.scandir(compliance_dir) as files:
@@ -97,7 +97,9 @@ def get_available_compliance_frameworks(provider=None):
available_compliance_frameworks.append(
file.name.removesuffix(".json")
)
# Also scan top-level compliance/ for multi-provider JSONs
# Also scan top-level compliance/ for multi-provider (universal) JSONs.
# When a specific provider was requested, only include the framework if it
# declares support for that provider; otherwise include all universal frameworks.
compliance_root = f"{actual_directory}/../compliance"
if os.path.isdir(compliance_root):
with os.scandir(compliance_root) as files:
+1
View File
@@ -141,6 +141,7 @@ aws:
# ]
organizations_enabled_regions: []
organizations_trusted_delegated_administrators: []
organizations_trusted_ids: []
# AWS ECR
# aws.ecr_repositories_scan_vulnerabilities_in_latest_image
+30 -7
View File
@@ -299,12 +299,22 @@ def print_compliance_frameworks(
def print_compliance_requirements(
bulk_compliance_frameworks: dict, compliance_frameworks: list
):
from prowler.lib.check.compliance_models import ComplianceFramework
for compliance_framework in compliance_frameworks:
for key in bulk_compliance_frameworks.keys():
framework = bulk_compliance_frameworks[key].Framework
provider = bulk_compliance_frameworks[key].Provider
version = bulk_compliance_frameworks[key].Version
requirements = bulk_compliance_frameworks[key].Requirements
entry = bulk_compliance_frameworks[key]
is_universal = isinstance(entry, ComplianceFramework)
if is_universal:
framework = entry.framework
provider = entry.provider or "Multi-provider"
version = entry.version
requirements = entry.requirements
else:
framework = entry.Framework
provider = entry.Provider or "Multi-provider"
version = entry.Version
requirements = entry.Requirements
# We can list the compliance requirements for a given framework using the
# bulk_compliance_frameworks keys since they are the compliance specification file name
if compliance_framework == key:
@@ -313,10 +323,23 @@ def print_compliance_requirements(
)
for requirement in requirements:
checks = ""
for check in requirement.Checks:
checks += f" {Fore.YELLOW}\t\t{check}\n{Style.RESET_ALL}"
if is_universal:
req_checks = requirement.checks
req_id = requirement.id
req_description = requirement.description
else:
req_checks = requirement.Checks
req_id = requirement.Id
req_description = requirement.Description
if isinstance(req_checks, dict):
for prov, check_list in req_checks.items():
for check in check_list:
checks += f" {Fore.YELLOW}\t\t[{prov}] {check}\n{Style.RESET_ALL}"
else:
for check in req_checks:
checks += f" {Fore.YELLOW}\t\t{check}\n{Style.RESET_ALL}"
print(
f"Requirement Id: {Fore.MAGENTA}{requirement.Id}{Style.RESET_ALL}\n\t- Description: {requirement.Description}\n\t- Checks:\n{checks}"
f"Requirement Id: {Fore.MAGENTA}{req_id}{Style.RESET_ALL}\n\t- Description: {req_description}\n\t- Checks:\n{checks}"
)
+15 -5
View File
@@ -22,6 +22,7 @@ def load_checks_to_execute(
categories: set = None,
resource_groups: set = None,
list_checks: bool = False,
universal_frameworks: dict = None,
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
@@ -155,12 +156,21 @@ def load_checks_to_execute(
if not bulk_compliance_frameworks:
bulk_compliance_frameworks = Compliance.get_bulk(provider=provider)
for compliance_framework in compliance_frameworks:
checks_to_execute.update(
CheckMetadata.list(
bulk_compliance_frameworks=bulk_compliance_frameworks,
compliance_framework=compliance_framework,
# Try universal frameworks first (snake_case dict-keyed checks)
if (
universal_frameworks
and compliance_framework in universal_frameworks
):
fw = universal_frameworks[compliance_framework]
for req in fw.requirements:
checks_to_execute.update(req.checks.get(provider.lower(), []))
elif compliance_framework in bulk_compliance_frameworks:
checks_to_execute.update(
CheckMetadata.list(
bulk_compliance_frameworks=bulk_compliance_frameworks,
compliance_framework=compliance_framework,
)
)
)
# Handle if there are categories passed using --categories
elif categories:
+43
View File
@@ -102,6 +102,48 @@ class CIS_Requirement_Attribute(BaseModel):
References: str
class EssentialEight_Requirement_Attribute_MaturityLevel(str, Enum):
"""ASD Essential Eight Maturity Level"""
ML1 = "ML1"
ML2 = "ML2"
ML3 = "ML3"
class EssentialEight_Requirement_Attribute_AssessmentStatus(str, Enum):
"""Essential Eight Requirement Attribute Assessment Status"""
Manual = "Manual"
Automated = "Automated"
class EssentialEight_Requirement_Attribute_CloudApplicability(str, Enum):
"""How well the ASD control maps to AWS cloud infrastructure."""
Full = "full"
Partial = "partial"
Limited = "limited"
NonApplicable = "non-applicable"
# Essential Eight Requirement Attribute
class EssentialEight_Requirement_Attribute(BaseModel):
"""ASD Essential Eight Requirement Attribute"""
Section: str
MaturityLevel: EssentialEight_Requirement_Attribute_MaturityLevel
AssessmentStatus: EssentialEight_Requirement_Attribute_AssessmentStatus
CloudApplicability: EssentialEight_Requirement_Attribute_CloudApplicability
MitigatedThreats: list[str]
Description: str
RationaleStatement: str
ImpactStatement: str
RemediationProcedure: str
AuditProcedure: str
AdditionalInformation: str
References: str
# Well Architected Requirement Attribute
class AWS_Well_Architected_Requirement_Attribute(BaseModel):
"""AWS Well Architected Requirement Attribute"""
@@ -250,6 +292,7 @@ class Compliance_Requirement(BaseModel):
Name: Optional[str] = None
Attributes: list[
Union[
EssentialEight_Requirement_Attribute,
CIS_Requirement_Attribute,
ENS_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
-32
View File
@@ -1279,38 +1279,6 @@ class CheckReportVercel(Check_Report):
return "global"
@dataclass
class CheckReportASPM(Check_Report):
"""Contains the ASPM Check's finding information.
Attributes:
resource_name: Human-readable agent name.
resource_id: Unique agent identifier.
environment: Deployment environment (prod/staging/dev).
cloud_provider: Cloud provider (aws/azure/gcp).
"""
resource_name: str
resource_id: str
environment: str
cloud_provider: str
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialise the ASPM Check's finding information.
Args:
metadata: The check metadata.
resource: An AgentConfig instance or compatible dict.
"""
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.environment = getattr(resource, "environment", "unknown")
self.cloud_provider = getattr(resource, "cloud_provider", "unknown")
# Testing Pending
def load_check_metadata(metadata_file: str) -> CheckMetadata:
"""
+2 -31
View File
@@ -29,10 +29,10 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm,aspm} ...",
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,googleworkspace,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,aspm}
{aws,azure,gcp,kubernetes,m365,github,googleworkspace,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -47,7 +47,6 @@ Available Cloud Providers:
iac IaC Provider
llm LLM Provider (Beta)
image Container Image Provider
aspm Agent Security Posture Management (ASPM) Provider (Beta)
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider
vercel Vercel Provider
@@ -452,31 +451,3 @@ Detailed documentation at https://docs.prowler.com
action="store_true",
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.com/user-guide/cli/tutorials/integrations#configuration-of-the-integration-with-slack/).",
)
# Datadog Integration
datadog_subparser = self.common_providers_parser.add_argument_group(
"Datadog Integration"
)
datadog_subparser.add_argument(
"--datadog",
action="store_true",
help="Send findings to Datadog via the Logs API. Requires --datadog-api-key or the DATADOG_API_KEY environment variable.",
)
datadog_subparser.add_argument(
"--datadog-api-key",
nargs="?",
default=None,
metavar="DATADOG_API_KEY",
help="Datadog API key. Can also be set via the DATADOG_API_KEY environment variable.",
)
datadog_subparser.add_argument(
"--datadog-site",
nargs="?",
default="datadoghq.com",
metavar="DATADOG_SITE",
help="Datadog site to send findings to (default: datadoghq.com). Options: datadoghq.com, us3.datadoghq.com, us5.datadoghq.com, datadoghq.eu, ap1.datadoghq.com, ddog-gov.com.",
)
datadog_subparser.add_argument(
"--send-dd-only-fails",
action="store_true",
help="Send only FAIL findings to Datadog.",
)
+141 -62
View File
@@ -1,12 +1,17 @@
import sys
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.c5.c5 import get_c5_table
from prowler.lib.outputs.compliance.ccc.ccc import get_ccc_table
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
from prowler.lib.outputs.compliance.compliance_check import ( # noqa: F401 - re-export for backward compatibility
get_check_compliance,
)
from prowler.lib.outputs.compliance.csa.csa import get_csa_table
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
from prowler.lib.outputs.compliance.essential_eight.essential_eight import (
get_essential_eight_table,
)
from prowler.lib.outputs.compliance.generic.generic_table import (
get_generic_compliance_table,
)
@@ -17,6 +22,94 @@ from prowler.lib.outputs.compliance.mitre_attack.mitre_attack import (
from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore import (
get_prowler_threatscore_table,
)
from prowler.lib.outputs.compliance.universal.universal_table import get_universal_table
def process_universal_compliance_frameworks(
input_compliance_frameworks: set,
universal_frameworks: dict,
finding_outputs: list,
output_directory: str,
output_filename: str,
provider: str,
generated_outputs: dict,
) -> set:
"""Process universal compliance frameworks, generating CSV and OCSF outputs.
For each framework in *input_compliance_frameworks* that exists in
*universal_frameworks* and has an outputs.table_config, this function
creates both a CSV (UniversalComplianceOutput) and an OCSF JSON
(OCSFComplianceOutput) file. OCSF is always generated regardless of
the user's ``--output-formats`` flag.
The function is idempotent: it tracks already-created writers via
``generated_outputs["compliance"]`` keyed by ``file_path``. If invoked
again for the same framework (e.g. once per streaming batch), it
reuses the existing writer instead of recreating it. This guarantees
one output writer per framework for the whole execution and keeps
the OCSF JSON array valid across multiple calls.
Returns the set of framework names that were processed so the caller
can remove them before entering the legacy per-provider output loop.
"""
from prowler.lib.outputs.compliance.universal.ocsf_compliance import (
OCSFComplianceOutput,
)
from prowler.lib.outputs.compliance.universal.universal_output import (
UniversalComplianceOutput,
)
existing_writers = {
getattr(out, "file_path", None): out
for out in generated_outputs.get("compliance", [])
if isinstance(out, (UniversalComplianceOutput, OCSFComplianceOutput))
}
processed = set()
for compliance_name in input_compliance_frameworks:
if not (
compliance_name in universal_frameworks
and universal_frameworks[compliance_name].outputs
and universal_frameworks[compliance_name].outputs.table_config
):
continue
fw = universal_frameworks[compliance_name]
# CSV output
csv_path = (
f"{output_directory}/compliance/" f"{output_filename}_{compliance_name}.csv"
)
if csv_path not in existing_writers:
output = UniversalComplianceOutput(
findings=finding_outputs,
framework=fw,
file_path=csv_path,
provider=provider,
)
generated_outputs["compliance"].append(output)
existing_writers[csv_path] = output
output.batch_write_data_to_file()
# OCSF output (always generated for universal frameworks)
ocsf_path = (
f"{output_directory}/compliance/"
f"{output_filename}_{compliance_name}.ocsf.json"
)
if ocsf_path not in existing_writers:
ocsf_output = OCSFComplianceOutput(
findings=finding_outputs,
framework=fw,
file_path=ocsf_path,
provider=provider,
)
generated_outputs["compliance"].append(ocsf_output)
existing_writers[ocsf_path] = ocsf_output
ocsf_output.batch_write_data_to_file()
processed.add(compliance_name)
return processed
def display_compliance_table(
@@ -26,6 +119,9 @@ def display_compliance_table(
output_filename: str,
output_directory: str,
compliance_overview: bool,
universal_frameworks: dict = None,
provider: str = None,
output_formats: list = None,
) -> None:
"""
display_compliance_table generates the compliance table for the given compliance framework.
@@ -37,6 +133,9 @@ def display_compliance_table(
output_filename (str): The output filename
output_directory (str): The output directory
compliance_overview (bool): The compliance
universal_frameworks (dict): Optional universal ComplianceFramework objects
provider (str): The current provider (e.g. "aws") for multi-provider filtering
output_formats (list): The output formats to generate
Returns:
None
@@ -45,16 +144,24 @@ def display_compliance_table(
findings = [f for f in findings if f.check_metadata.CheckID in bulk_checks_metadata]
try:
if "ens_" in compliance_framework:
get_ens_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
elif "cis_" in compliance_framework:
# Universal path: if the framework has TableConfig, use the universal renderer
if universal_frameworks and compliance_framework in universal_frameworks:
fw = universal_frameworks[compliance_framework]
if fw.outputs and fw.outputs.table_config:
get_universal_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
framework=fw,
provider=provider,
output_formats=output_formats,
)
return
if compliance_framework.startswith("cis_"):
get_cis_table(
findings,
bulk_checks_metadata,
@@ -63,7 +170,16 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "mitre_attack" in compliance_framework:
elif compliance_framework.startswith("ens_"):
get_ens_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("mitre_attack"):
get_mitre_attack_table(
findings,
bulk_checks_metadata,
@@ -72,7 +188,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "kisa_isms_" in compliance_framework:
elif compliance_framework.startswith("kisa"):
get_kisa_ismsp_table(
findings,
bulk_checks_metadata,
@@ -81,7 +197,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "threatscore_" in compliance_framework:
elif compliance_framework.startswith("prowler_threatscore_"):
get_prowler_threatscore_table(
findings,
bulk_checks_metadata,
@@ -90,7 +206,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "csa_ccm_" in compliance_framework:
elif compliance_framework.startswith("csa_ccm_"):
get_csa_table(
findings,
bulk_checks_metadata,
@@ -99,7 +215,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "c5_" in compliance_framework:
elif compliance_framework.startswith("c5_"):
get_c5_table(
findings,
bulk_checks_metadata,
@@ -117,6 +233,15 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "essential_eight" in compliance_framework:
get_essential_eight_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
else:
get_generic_compliance_table(
findings,
@@ -131,49 +256,3 @@ def display_compliance_table(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
sys.exit(1)
# TODO: this should be in the Check class
def get_check_compliance(
finding: Check_Report, provider_type: str, bulk_checks_metadata: dict
) -> dict:
"""get_check_compliance returns a map with the compliance framework as key and the requirements where the finding's check is present.
Example:
{
"CIS-1.4": ["2.1.3"],
"CIS-1.5": ["2.1.3"],
}
Args:
finding (Any): The Check_Report finding
provider_type (str): The provider type
bulk_checks_metadata (dict): The bulk checks metadata
Returns:
dict: The compliance framework as key and the requirements where the finding's check is present.
"""
try:
check_compliance = {}
# We have to retrieve all the check's compliance requirements
if finding.check_metadata.CheckID in bulk_checks_metadata:
for compliance in bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
# compliance.Provider == "Azure" or "Kubernetes"
# provider_type == "azure" or "kubernetes"
if compliance.Provider.upper() == provider_type.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return {}
@@ -0,0 +1,48 @@
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
# TODO: this should be in the Check class
def get_check_compliance(
finding: Check_Report, provider_type: str, bulk_checks_metadata: dict
) -> dict:
"""get_check_compliance returns a map with the compliance framework as key and the requirements where the finding's check is present.
Example:
{
"CIS-1.4": ["2.1.3"],
"CIS-1.5": ["2.1.3"],
}
Args:
finding (Any): The Check_Report finding
provider_type (str): The provider type
bulk_checks_metadata (dict): The bulk checks metadata
Returns:
dict: The compliance framework as key and the requirements where the finding's check is present.
"""
try:
check_compliance = {}
# We have to retrieve all the check's compliance requirements
if finding.check_metadata.CheckID in bulk_checks_metadata:
for compliance in bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
# compliance.Provider == "Azure" or "Kubernetes"
# provider_type == "azure" or "kubernetes"
if compliance.Provider.upper() == provider_type.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return {}
@@ -0,0 +1,98 @@
from colorama import Fore, Style
from tabulate import tabulate
from prowler.config.config import orange_color
def get_essential_eight_table(
findings: list,
bulk_checks_metadata: dict,
compliance_framework: str,
output_filename: str,
output_directory: str,
compliance_overview: bool,
):
sections = {}
essential_eight_compliance_table = {
"Provider": [],
"Section": [],
"Status": [],
"Muted": [],
}
pass_count = []
fail_count = []
muted_count = []
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "Essential-Eight":
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
if section not in sections:
sections[section] = {
"FAIL": 0,
"PASS": 0,
"Muted": 0,
}
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
essential_eight_compliance_table["Provider"].append(compliance.Provider)
essential_eight_compliance_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
essential_eight_compliance_table["Status"].append(
f"{Fore.RED}FAIL({sections[section]['FAIL']}){Style.RESET_ALL}"
)
elif sections[section]["PASS"] > 0:
essential_eight_compliance_table["Status"].append(
f"{Fore.GREEN}PASS({sections[section]['PASS']}){Style.RESET_ALL}"
)
else:
essential_eight_compliance_table["Status"].append("-")
essential_eight_compliance_table["Muted"].append(
f"{orange_color}{sections[section]['Muted']}{Style.RESET_ALL}"
)
if len(fail_count) + len(pass_count) + len(muted_count) > 1:
print(
f"\nCompliance Status of {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Framework:"
)
total_findings_count = len(fail_count) + len(pass_count) + len(muted_count)
overview_table = [
[
f"{Fore.RED}{round(len(fail_count) / total_findings_count * 100, 2)}% ({len(fail_count)}) FAIL{Style.RESET_ALL}",
f"{Fore.GREEN}{round(len(pass_count) / total_findings_count * 100, 2)}% ({len(pass_count)}) PASS{Style.RESET_ALL}",
f"{orange_color}{round(len(muted_count) / total_findings_count * 100, 2)}% ({len(muted_count)}) MUTED{Style.RESET_ALL}",
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
if not compliance_overview:
print(
f"\nFramework {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Results:"
)
print(
tabulate(
essential_eight_compliance_table,
headers="keys",
tablefmt="rounded_grid",
)
)
print(
f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}"
)
print(f"\nDetailed results of {compliance_framework.upper()} are in:")
print(
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n"
)
@@ -0,0 +1,111 @@
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.essential_eight.models import (
EssentialEightAWSModel,
)
from prowler.lib.outputs.finding import Finding
class EssentialEightAWS(ComplianceOutput):
"""
This class represents the AWS ASD Essential Eight compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into AWS Essential Eight compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS Essential Eight compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_MaturityLevel=attribute.MaturityLevel,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_CloudApplicability=attribute.CloudApplicability,
Requirements_Attributes_MitigatedThreats=", ".join(
attribute.MitigatedThreats
),
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
AccountId="",
Region="",
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_MaturityLevel=attribute.MaturityLevel,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_CloudApplicability=attribute.CloudApplicability,
Requirements_Attributes_MitigatedThreats=", ".join(
attribute.MitigatedThreats
),
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
@@ -0,0 +1,35 @@
from pydantic.v1 import BaseModel
class EssentialEightAWSModel(BaseModel):
"""
EssentialEightAWSModel generates a finding's output in AWS ASD Essential Eight Compliance format.
"""
Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_Section: str
Requirements_Attributes_MaturityLevel: str
Requirements_Attributes_AssessmentStatus: str
Requirements_Attributes_CloudApplicability: str
Requirements_Attributes_MitigatedThreats: str
Requirements_Attributes_Description: str
Requirements_Attributes_RationaleStatement: str
Requirements_Attributes_ImpactStatement: str
Requirements_Attributes_RemediationProcedure: str
Requirements_Attributes_AuditProcedure: str
Requirements_Attributes_AdditionalInformation: str
Requirements_Attributes_References: str
Status: str
StatusExtended: str
ResourceId: str
ResourceName: str
CheckId: str
Muted: bool
Framework: str
Name: str
@@ -1,6 +1,7 @@
import json
import os
from datetime import datetime
from typing import List
from typing import TYPE_CHECKING, List
from py_ocsf_models.events.base_event import SeverityID
from py_ocsf_models.events.base_event import StatusID as EventStatusID
@@ -20,11 +21,12 @@ from py_ocsf_models.objects.resource_details import ResourceDetails
from prowler.config.config import prowler_version
from prowler.lib.check.compliance_models import ComplianceFramework
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.ocsf.ocsf import OCSF
from prowler.lib.outputs.utils import unroll_dict_to_list
from prowler.lib.utils.utils import open_file
if TYPE_CHECKING:
from prowler.lib.outputs.finding import Finding
PROWLER_TO_COMPLIANCE_STATUS = {
"PASS": ComplianceStatusID.Pass,
"FAIL": ComplianceStatusID.Fail,
@@ -32,6 +34,40 @@ PROWLER_TO_COMPLIANCE_STATUS = {
}
def _sanitize_resource_data(resource_details, resource_metadata) -> dict:
"""Ensure resource data is JSON-serializable.
Service resource_metadata may carry non-serializable objects (e.g. raw
Pydantic models or service classes such as ``Trail`` / ``LifecyclePolicy``).
Convert them to plain dicts and roundtrip through JSON so the resulting
ComplianceFinding can be serialized without errors.
"""
def _make_serializable(obj):
if hasattr(obj, "model_dump") and callable(obj.model_dump):
return _make_serializable(obj.model_dump())
if hasattr(obj, "dict") and callable(obj.dict):
return _make_serializable(obj.dict())
if isinstance(obj, dict):
return {str(k): _make_serializable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_make_serializable(v) for v in obj]
return obj
try:
converted = _make_serializable(resource_metadata)
sanitized_metadata = json.loads(json.dumps(converted, default=str))
except (TypeError, ValueError, RecursionError) as error:
logger.warning(
f"Failed to serialize resource metadata, defaulting to empty: {error}"
)
sanitized_metadata = {}
return {
"details": resource_details,
"metadata": sanitized_metadata,
}
def _to_snake_case(name: str) -> str:
"""Convert a PascalCase or camelCase string to snake_case."""
import re
@@ -108,7 +144,7 @@ class OCSFComplianceOutput:
def _transform(
self,
findings: List[Finding],
findings: List["Finding"],
framework: ComplianceFramework,
compliance_name: str,
) -> None:
@@ -177,7 +213,7 @@ class OCSFComplianceOutput:
def _build_compliance_finding(
self,
finding: Finding,
finding: "Finding",
framework: ComplianceFramework,
requirement,
compliance_name: str,
@@ -195,7 +231,9 @@ class OCSFComplianceOutput:
finding.metadata.Severity.capitalize(),
SeverityID.Unknown,
)
event_status = OCSF.get_finding_status_id(finding.muted)
event_status = (
EventStatusID.Suppressed if finding.muted else EventStatusID.New
)
time_value = (
int(finding.timestamp.timestamp())
@@ -268,10 +306,10 @@ class OCSFComplianceOutput:
if finding.provider == "kubernetes"
else None
),
data={
"details": finding.resource_details,
"metadata": finding.resource_metadata,
},
data=_sanitize_resource_data(
finding.resource_details,
finding.resource_metadata,
),
)
],
severity_id=finding_severity.value,
@@ -0,0 +1,294 @@
from csv import DictWriter
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from pydantic.v1 import create_model
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import ComplianceFramework
from prowler.lib.logger import logger
from prowler.lib.utils.utils import open_file
if TYPE_CHECKING:
from prowler.lib.outputs.finding import Finding
PROVIDER_HEADER_MAP = {
"aws": ("AccountId", "account_uid", "Region", "region"),
"azure": ("SubscriptionId", "account_uid", "Location", "region"),
"gcp": ("ProjectId", "account_uid", "Location", "region"),
"kubernetes": ("Context", "account_name", "Namespace", "region"),
"m365": ("TenantId", "account_uid", "Location", "region"),
"github": ("Account_Name", "account_name", "Account_Id", "account_uid"),
"oraclecloud": ("TenancyId", "account_uid", "Region", "region"),
"alibabacloud": ("AccountId", "account_uid", "Region", "region"),
"nhn": ("AccountId", "account_uid", "Region", "region"),
}
_DEFAULT_HEADERS = ("AccountId", "account_uid", "Region", "region")
class UniversalComplianceOutput:
"""Universal compliance CSV output driven by ComplianceFramework metadata.
Dynamically builds a Pydantic row model from attributes_metadata so that
CSV columns match the framework's declared attribute fields.
"""
def __init__(
self,
findings: list,
framework: ComplianceFramework,
file_path: str = None,
from_cli: bool = True,
provider: str = None,
) -> None:
self._data = []
self._file_descriptor = None
self.file_path = file_path
self._from_cli = from_cli
self._provider = provider
self.close_file = False
if file_path:
path_obj = Path(file_path)
self._file_extension = path_obj.suffix if path_obj.suffix else ""
if findings:
self._row_model = self._build_row_model(framework)
compliance_name = (
framework.framework + "-" + framework.version
if framework.version
else framework.framework
)
self._transform(findings, framework, compliance_name)
if not self._file_descriptor and file_path:
self._create_file_descriptor(file_path)
@property
def data(self):
return self._data
def _build_row_model(self, framework: ComplianceFramework):
"""Build a dynamic Pydantic model from attributes_metadata."""
acct_header, acct_field, loc_header, loc_field = PROVIDER_HEADER_MAP.get(
(self._provider or "").lower(), _DEFAULT_HEADERS
)
self._acct_header = acct_header
self._acct_field = acct_field
self._loc_header = loc_header
self._loc_field = loc_field
# Base fields present in every compliance CSV
fields = {
"Provider": (str, ...),
"Description": (str, ...),
acct_header: (str, ...),
loc_header: (str, ...),
"AssessmentDate": (str, ...),
"Requirements_Id": (str, ...),
"Requirements_Description": (str, ...),
}
# Dynamic attribute columns from metadata
if framework.attributes_metadata:
for attr_meta in framework.attributes_metadata:
if not attr_meta.output_formats.csv:
continue
field_name = f"Requirements_Attributes_{attr_meta.key}"
# Map type strings to Python types
type_map = {
"str": Optional[str],
"int": Optional[int],
"float": Optional[float],
"bool": Optional[bool],
"list_str": Optional[str], # Serialized as joined string
"list_dict": Optional[str], # Serialized as string
}
py_type = type_map.get(attr_meta.type, Optional[str])
fields[field_name] = (py_type, None)
# Check if any requirement has MITRE fields
has_mitre = any(req.tactics for req in framework.requirements if req.tactics)
if has_mitre:
fields["Requirements_Tactics"] = (Optional[str], None)
fields["Requirements_SubTechniques"] = (Optional[str], None)
fields["Requirements_Platforms"] = (Optional[str], None)
fields["Requirements_TechniqueURL"] = (Optional[str], None)
# Trailing fields
fields["Status"] = (str, ...)
fields["StatusExtended"] = (str, ...)
fields["ResourceId"] = (str, ...)
fields["ResourceName"] = (str, ...)
fields["CheckId"] = (str, ...)
fields["Muted"] = (bool, ...)
fields["Framework"] = (str, ...)
fields["Name"] = (str, ...)
return create_model("UniversalComplianceRow", **fields)
def _serialize_attr_value(self, value):
"""Serialize attribute values for CSV."""
if isinstance(value, list):
if value and isinstance(value[0], dict):
return str(value)
return " | ".join(str(v) for v in value)
return value
def _build_row(self, finding, framework, requirement, is_manual=False):
"""Build a single row dict for a finding + requirement combination."""
row = {
"Provider": (
finding.provider
if not is_manual
else (framework.provider or self._provider or "").lower()
),
"Description": framework.description,
self._acct_header: (
getattr(finding, self._acct_field, "") if not is_manual else ""
),
self._loc_header: (
getattr(finding, self._loc_field, "") if not is_manual else ""
),
"AssessmentDate": str(timestamp),
"Requirements_Id": requirement.id,
"Requirements_Description": requirement.description,
}
# Add dynamic attribute columns
if framework.attributes_metadata:
for attr_meta in framework.attributes_metadata:
if not attr_meta.output_formats.csv:
continue
field_name = f"Requirements_Attributes_{attr_meta.key}"
raw_val = requirement.attributes.get(attr_meta.key)
row[field_name] = (
self._serialize_attr_value(raw_val) if raw_val is not None else None
)
# MITRE fields
if requirement.tactics:
row["Requirements_Tactics"] = (
" | ".join(requirement.tactics) if requirement.tactics else None
)
row["Requirements_SubTechniques"] = (
" | ".join(requirement.sub_techniques)
if requirement.sub_techniques
else None
)
row["Requirements_Platforms"] = (
" | ".join(requirement.platforms) if requirement.platforms else None
)
row["Requirements_TechniqueURL"] = requirement.technique_url
row["Status"] = finding.status if not is_manual else "MANUAL"
row["StatusExtended"] = (
finding.status_extended if not is_manual else "Manual check"
)
row["ResourceId"] = finding.resource_uid if not is_manual else "manual_check"
row["ResourceName"] = finding.resource_name if not is_manual else "Manual check"
row["CheckId"] = finding.check_id if not is_manual else "manual"
row["Muted"] = finding.muted if not is_manual else False
row["Framework"] = framework.framework
row["Name"] = framework.name
return row
def _transform(
self,
findings: list["Finding"],
framework: ComplianceFramework,
compliance_name: str,
) -> None:
"""Transform findings into universal compliance CSV rows."""
# Build check -> requirements map (filtered by provider for dict checks)
check_req_map = {}
for req in framework.requirements:
checks = req.checks
if self._provider:
all_checks = checks.get(self._provider.lower(), [])
else:
all_checks = []
for check_list in checks.values():
all_checks.extend(check_list)
for check_id in all_checks:
if check_id not in check_req_map:
check_req_map[check_id] = []
check_req_map[check_id].append(req)
# Process findings using the provider-filtered check_req_map.
# This ensures that for multi-provider dict checks, only the checks
# belonging to the current provider produce output rows.
for finding in findings:
check_id = finding.check_id
if check_id in check_req_map:
for req in check_req_map[check_id]:
row = self._build_row(finding, framework, req)
try:
self._data.append(self._row_model(**row))
except Exception as e:
logger.debug(f"Skipping row for {req.id}: {e}")
# Manual requirements (no checks or empty dict)
for req in framework.requirements:
checks = req.checks
if self._provider:
has_checks = bool(checks.get(self._provider.lower(), []))
else:
has_checks = any(checks.values())
if not has_checks:
# Use a dummy finding-like namespace for manual rows
row = self._build_row(
_ManualFindingStub(), framework, req, is_manual=True
)
try:
self._data.append(self._row_model(**row))
except Exception as e:
logger.debug(f"Skipping manual row for {req.id}: {e}")
def _create_file_descriptor(self, file_path: str) -> None:
try:
self._file_descriptor = open_file(file_path, "a")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def batch_write_data_to_file(self) -> None:
"""Write findings data to CSV."""
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
csv_writer = DictWriter(
self._file_descriptor,
fieldnames=[field.upper() for field in self._data[0].dict().keys()],
delimiter=";",
)
if self._file_descriptor.tell() == 0:
csv_writer.writeheader()
for row in self._data:
csv_writer.writerow({k.upper(): v for k, v in row.dict().items()})
if self.close_file or self._from_cli:
self._file_descriptor.close()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class _ManualFindingStub:
"""Minimal stub to satisfy _build_row for manual requirements."""
provider = ""
account_uid = ""
account_name = ""
region = ""
status = "MANUAL"
status_extended = "Manual check"
resource_uid = "manual_check"
resource_name = "Manual check"
check_id = "manual"
muted = False
+3 -2
View File
@@ -15,7 +15,7 @@ from prowler.lib.check.models import (
)
from prowler.lib.logger import logger
from prowler.lib.outputs.common import Status, fill_common_finding_data
from prowler.lib.outputs.compliance.compliance import get_check_compliance
from prowler.lib.outputs.compliance.compliance_check import get_check_compliance
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.utils.utils import dict_to_lowercase, get_nested_attribute
from prowler.providers.common.provider import Provider
@@ -245,15 +245,16 @@ class Finding(BaseModel):
elif provider.type == "kubernetes":
if provider.identity.context == "In-Cluster":
output_data["auth_method"] = "in-cluster"
output_data["provider_uid"] = provider.identity.cluster
else:
output_data["auth_method"] = "kubeconfig"
output_data["provider_uid"] = provider.identity.context
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["account_name"] = f"context: {provider.identity.context}"
output_data["account_uid"] = get_nested_attribute(
provider, "identity.cluster"
)
output_data["provider_uid"] = provider.identity.context
output_data["region"] = f"namespace: {check_output.namespace}"
elif provider.type == "github":
-316
View File
@@ -1,316 +0,0 @@
"""ASPM (Agent Security Posture Management) Provider.
Reads an agent manifest file (YAML or JSON) that describes the security
configuration of deployed AI agents and exposes the parsed agent list to the
check engine.
Manifest format (YAML):
agents:
- id: agent-001
name: agent-docrecommender-prod
environment: prod # prod | staging | dev
cloud_provider: aws # aws | azure | gcp
region: us-east-1
identity:
type: iam_role
arn: arn:aws:iam::123456789012:role/agent-docrecommender-prod
tags:
agent: "true"
owner: team-ai
env: prod
purpose: document-recommendation
created_at: "2025-01-15"
last_used: "2026-03-01"
uses_oidc: true
uses_static_credentials: false
credential_age_days: 45
rotation_policy_days: 90
naming_compliant: true
has_owner_tag: true
session_duration_seconds: 3600
permissions:
has_wildcard_actions: false
has_admin_policy: false
has_permission_boundary: true
shares_role_with_human: false
data_domains_accessed: ["s3"]
credentials:
uses_secrets_manager: true
has_hardcoded_secrets: false
rotation_interval_days: 30
credentials_per_environment: true
network:
uses_https_only: true
has_egress_filtering: true
has_rate_limiting: true
validates_tls_certificates: true
data_access:
accesses_pii: false
data_encrypted_at_rest: true
data_encrypted_in_transit: true
runtime:
runs_as_root: false
privileged_container: false
has_resource_limits: true
image_scanned_for_cves: true
supply_chain:
framework_cves_scanned: true
dependencies_version_pinned: true
artifacts_signed: true
observability:
execution_logs_complete: true
audit_logs_immutable: true
security_event_alerting: true
compliance:
owasp_llm_top10_assessed: true
incident_response_plan_exists: true
attack_paths:
cross_cloud_escalation_possible: false
compromise_enables_full_account_takeover: false
"""
import json
import sys
from typing import List, Optional
import yaml
from colorama import Fore, Style
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.aspm.exceptions.exceptions import (
ASPMManifestInvalidError,
ASPMManifestNotFoundError,
ASPMNoAgentsFoundError,
)
from prowler.providers.aspm.models import AgentConfig
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.common.provider import Provider
class AspmProvider(Provider):
"""Provider for AI Agent Security Posture Management (ASPM).
Parses an agent manifest file and exposes the list of agent configurations
to the check engine.
Attributes:
_type: Provider type identifier ("aspm").
manifest_path: Path to the agent manifest file.
agents: Parsed and validated list of AgentConfig objects.
environment_filter: Optional environment filter (prod/staging/dev).
cloud_provider_filter: Optional cloud provider filter (aws/azure/gcp).
audit_metadata: Prowler audit metadata.
"""
_type: str = "aspm"
audit_metadata: Audit_Metadata
def __init__(
self,
manifest_path: str = "aspm-manifest.yaml",
environment: Optional[str] = None,
cloud_provider: Optional[str] = None,
config_path: Optional[str] = None,
config_content: Optional[dict] = None,
fixer_config: dict = {},
provider_uid: Optional[str] = None,
) -> None:
"""Initialise the ASPM provider.
Args:
manifest_path: Path to the YAML/JSON agent manifest file.
environment: Optional filter only assess agents in this env.
cloud_provider: Optional filter only assess agents on this cloud.
config_path: Prowler global config file path.
config_content: Prowler global config as a dict.
fixer_config: Fixer configuration.
provider_uid: Unique identifier for push-to-cloud integration.
"""
logger.info("Instantiating ASPM Provider...")
self.manifest_path = manifest_path
self.environment_filter = environment
self.cloud_provider_filter = cloud_provider
self._provider_uid = provider_uid
self._session = None
self._identity = "prowler"
self._auth_method = "No auth"
self.region = "global"
self.audited_account = "local-aspm"
# Load and parse the manifest
self.agents: List[AgentConfig] = self._load_manifest()
# Audit config
from prowler.config.config import (
default_config_file_path,
load_and_validate_config_file,
)
if config_content:
self._audit_config = config_content
elif config_path and config_path != default_config_file_path:
self._audit_config = load_and_validate_config_file(self._type, config_path)
else:
self._audit_config = {}
self._fixer_config = fixer_config
self._mutelist = None
self.audit_metadata = Audit_Metadata(
provider=self._type,
account_id=self.audited_account,
account_name="aspm",
region=self.region,
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
)
Provider.set_global_provider(self)
# ------------------------------------------------------------------
# Provider interface (abstract method implementations)
# ------------------------------------------------------------------
@property
def type(self) -> str:
"""Provider type identifier."""
return self._type
@property
def identity(self) -> str:
"""Provider identity string."""
return self._identity
@property
def session(self):
"""ASPM provider has no cloud session."""
return self._session
@property
def audit_config(self) -> dict:
"""Prowler audit configuration."""
return self._audit_config
@property
def fixer_config(self) -> dict:
"""Fixer configuration."""
return self._fixer_config
@property
def auth_method(self) -> str:
"""Authentication method description."""
return self._auth_method
def setup_session(self) -> None:
"""ASPM provider does not require a cloud session."""
def print_credentials(self) -> None:
"""Display provider summary in the CLI output."""
report_title = (
f"{Style.BRIGHT}Scanning AI Agent Security Posture:{Style.RESET_ALL}"
)
report_lines = [
f"Manifest: {Fore.YELLOW}{self.manifest_path}{Style.RESET_ALL}",
f"Agents loaded: {Fore.YELLOW}{len(self.agents)}{Style.RESET_ALL}",
]
if self.environment_filter:
report_lines.append(
f"Environment filter: {Fore.YELLOW}{self.environment_filter}{Style.RESET_ALL}"
)
if self.cloud_provider_filter:
report_lines.append(
f"Cloud provider filter: {Fore.YELLOW}{self.cloud_provider_filter}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
# ------------------------------------------------------------------
# Manifest loading
# ------------------------------------------------------------------
def _load_manifest(self) -> List[AgentConfig]:
"""Load and parse the agent manifest file.
Returns:
A list of validated AgentConfig objects.
Raises:
SystemExit: On unrecoverable manifest errors.
"""
import os
if not os.path.exists(self.manifest_path):
logger.critical(ASPMManifestNotFoundError(self.manifest_path).message)
sys.exit(1)
try:
with open(self.manifest_path, "r", encoding="utf-8") as fh:
if self.manifest_path.endswith(".json"):
raw = json.load(fh)
else:
raw = yaml.safe_load(fh)
except Exception as exc:
logger.critical(
ASPMManifestInvalidError(self.manifest_path, str(exc)).message
)
sys.exit(1)
if not isinstance(raw, dict) or "agents" not in raw:
logger.critical(
ASPMManifestInvalidError(
self.manifest_path,
"Root key 'agents' not found. "
"The manifest must contain a top-level 'agents' list.",
).message
)
sys.exit(1)
raw_agents = raw.get("agents", [])
if not raw_agents:
logger.critical(ASPMNoAgentsFoundError().message)
sys.exit(1)
agents: List[AgentConfig] = []
for entry in raw_agents:
try:
agent = AgentConfig(**entry)
# Apply optional filters
if (
self.environment_filter
and agent.environment != self.environment_filter
):
continue
if (
self.cloud_provider_filter
and agent.cloud_provider != self.cloud_provider_filter
):
continue
agents.append(agent)
except Exception as exc:
agent_id = entry.get("id", "<unknown>")
logger.error(
f"Skipping agent '{agent_id}' — manifest validation error: {exc}"
)
if not agents:
logger.warning(
"No agents matched the specified filters. "
"The assessment will produce no findings."
)
logger.info(f"Loaded {len(agents)} agent(s) from {self.manifest_path}")
return agents
@@ -1,36 +0,0 @@
"""ASPM Provider exceptions."""
class ASPMBaseException(Exception):
"""Base exception for the ASPM provider."""
def __init__(self, message: str = ""):
self.message = message
super().__init__(self.message)
class ASPMManifestNotFoundError(ASPMBaseException):
"""Raised when the ASPM agent manifest file is not found."""
def __init__(self, path: str):
super().__init__(f"ASPM manifest file not found: {path}")
class ASPMManifestInvalidError(ASPMBaseException):
"""Raised when the ASPM agent manifest file cannot be parsed."""
def __init__(self, path: str, detail: str = ""):
msg = f"ASPM manifest file is invalid: {path}"
if detail:
msg += f"{detail}"
super().__init__(msg)
class ASPMNoAgentsFoundError(ASPMBaseException):
"""Raised when the manifest contains no agents to assess."""
def __init__(self):
super().__init__(
"No agents found in the ASPM manifest. "
"Ensure the manifest contains at least one entry under 'agents'."
)
@@ -1,67 +0,0 @@
"""ASPM Provider CLI argument definitions."""
def init_parser(self):
"""Init the ASPM Provider CLI parser."""
aspm_parser = self.subparsers.add_parser(
"aspm",
parents=[self.common_providers_parser],
help="Agent Security Posture Management (ASPM) Provider (Beta)",
)
aspm_scan_group = aspm_parser.add_argument_group("ASPM Scan Options")
aspm_scan_group.add_argument(
"--manifest-path",
"-M",
dest="manifest_path",
default="aspm-manifest.yaml",
help=(
"Path to the ASPM agent manifest file (YAML or JSON) describing "
"deployed AI agent security configurations. "
"Default: aspm-manifest.yaml"
),
)
aspm_scan_group.add_argument(
"--environment",
dest="environment",
default=None,
choices=["prod", "staging", "dev"],
help=(
"Filter the assessment to agents in a specific environment. "
"Default: all environments."
),
)
aspm_scan_group.add_argument(
"--cloud-provider",
dest="cloud_provider",
default=None,
choices=["aws", "azure", "gcp"],
help=(
"Filter the assessment to agents running on a specific cloud provider. "
"Default: all cloud providers."
),
)
aspm_scan_group.add_argument(
"--provider-uid",
dest="provider_uid",
default=None,
help="Unique identifier for this ASPM scan (used with --push-to-cloud).",
)
def validate_arguments(arguments):
"""Validate ASPM provider arguments."""
import os
manifest_path = getattr(arguments, "manifest_path", "aspm-manifest.yaml")
if not os.path.exists(manifest_path):
return (
False,
f"ASPM manifest file not found: '{manifest_path}'. "
"Use --manifest-path to specify the correct path.",
)
return (True, "")
@@ -1,28 +0,0 @@
"""ASPM base service class."""
from prowler.lib.logger import logger
from prowler.providers.aspm.aspm_provider import AspmProvider
class AspmService:
"""Base class for all ASPM services.
Each subclass is responsible for a specific check category (identity,
permissions, credentials, ). On construction the service receives the
global AspmProvider instance and exposes the filtered list of agents that
the checks iterate over.
Attributes:
provider: The active AspmProvider instance.
agents: The list of AgentConfig objects to assess.
"""
def __init__(self, provider: AspmProvider) -> None:
"""Initialise the service with a reference to the provider.
Args:
provider: The active AspmProvider instance.
"""
logger.info(f"Initialising {self.__class__.__name__}...")
self.provider = provider
self.agents = provider.agents
-573
View File
@@ -1,573 +0,0 @@
"""ASPM Provider data models.
These models represent the security posture of AI agent deployments as
declared in an agent manifest file (YAML/JSON). Each model field
corresponds directly to a check category defined in the ASPM check suite.
"""
from __future__ import annotations
from datetime import date
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
# ---------------------------------------------------------------------------
# Sub-models per check category
# ---------------------------------------------------------------------------
class AgentIdentityConfig(BaseModel):
"""Identity & Authentication configuration for an AI agent."""
type: str = Field(
default="iam_role",
description="Credential type: iam_role | managed_identity | service_account | api_key",
)
arn: Optional[str] = Field(default=None, description="Full ARN / resource ID")
tags: Dict[str, str] = Field(
default_factory=dict,
description="Tags applied to the identity resource",
)
created_at: Optional[date] = Field(default=None, description="Creation date")
last_used: Optional[date] = Field(
default=None, description="Last authentication date"
)
uses_oidc: bool = Field(
default=False,
description="Whether OIDC/Workload Identity federation is used instead of static keys",
)
uses_static_credentials: bool = Field(
default=True,
description="Whether static (long-lived) credentials are used",
)
credential_age_days: Optional[int] = Field(
default=None,
description="Age of the credentials in days (None = unknown)",
)
rotation_policy_days: Optional[int] = Field(
default=None,
description="Maximum allowed credential age before rotation (days)",
)
naming_compliant: bool = Field(
default=True,
description="Whether the identity name follows organisational naming conventions",
)
has_owner_tag: bool = Field(
default=False,
description="Whether the identity has an 'owner' tag linking it to a team",
)
cross_cloud_registered: bool = Field(
default=True,
description="For multi-cloud: whether the identity is registered in all target clouds",
)
jwt_validation_enabled: bool = Field(
default=False,
description="Whether agent-to-agent JWT claims (exp, iss, sub, aud) are validated",
)
session_duration_seconds: Optional[int] = Field(
default=None,
description="Max assumed-role session duration in seconds (None = unlimited)",
)
has_deprovisioning_record: bool = Field(
default=True,
description="Whether a deprovisioning record / SOP exists for this identity",
)
oauth_scope_minimal: bool = Field(
default=True,
description="Whether OAuth tokens are requested with minimal required scopes",
)
unused_secondary_credentials: bool = Field(
default=False,
description="Whether unused secondary credentials (backup keys) exist",
)
class AgentPermissionsConfig(BaseModel):
"""Permissions & Least Privilege configuration for an AI agent."""
has_wildcard_actions: bool = Field(
default=False,
description="Whether any policy grants wildcard actions (s3:*, *:*)",
)
has_wildcard_resources: bool = Field(
default=False,
description="Whether any policy grants wildcard resource ARNs (*)",
)
has_admin_policy: bool = Field(
default=False,
description="Whether an admin or power-user managed policy is attached",
)
has_inline_policies: bool = Field(
default=False,
description="Whether inline policies (instead of managed) are attached",
)
can_escalate_privileges: bool = Field(
default=False,
description="Whether the agent can escalate to human admin roles",
)
cross_account_access: bool = Field(
default=False,
description="Whether the agent has cross-account permissions",
)
cross_account_accounts: int = Field(
default=0,
description="Number of accounts the agent can access cross-account",
)
has_permission_boundary: bool = Field(
default=False,
description="Whether a permission boundary enforces the maximum permission set",
)
shares_role_with_human: bool = Field(
default=False,
description="Whether humans share the same role as this agent",
)
session_duration_seconds: Optional[int] = Field(
default=None,
description="Max session duration in seconds for assumed roles",
)
permissions_last_reviewed_days: Optional[int] = Field(
default=None,
description="Days since permissions were last reviewed (None = never)",
)
data_domains_accessed: List[str] = Field(
default_factory=list,
description="List of data domains accessible (e.g. ['s3', 'rds', 'redshift'])",
)
has_condition_on_sensitive_actions: bool = Field(
default=True,
description="Whether conditions (IP, tag, time) restrict high-risk permissions",
)
all_resources_tagged: bool = Field(
default=False,
description="Whether agent service principals carry all required governance tags",
)
permission_changes_approved: bool = Field(
default=True,
description="Whether all permission changes are traceable to approved change requests",
)
class AgentCredentialsConfig(BaseModel):
"""Credential Management configuration for an AI agent."""
has_hardcoded_secrets: bool = Field(
default=False,
description="Whether hardcoded credentials exist in code, IaC, or manifests",
)
credentials_in_logs: bool = Field(
default=False,
description="Whether credentials appear in logs or error messages",
)
uses_secrets_manager: bool = Field(
default=False,
description="Whether credentials are retrieved from a cloud secrets manager",
)
api_key_in_vcs: bool = Field(
default=False,
description="Whether API keys or tokens have been committed to version control",
)
rotation_interval_days: Optional[int] = Field(
default=None,
description="Actual credential rotation interval in days (None = no rotation)",
)
secrets_in_iac: bool = Field(
default=False,
description="Whether secrets are embedded in Terraform / CloudFormation",
)
database_uses_proxy: bool = Field(
default=False,
description="Whether database connections use a managed proxy (RDS Proxy / Cloud SQL Proxy)",
)
third_party_keys_managed: bool = Field(
default=True,
description="Whether third-party API keys (Slack, GitHub, etc.) are in secrets manager",
)
credential_access_audit_trail: bool = Field(
default=False,
description="Whether credential access (GetSecretValue, etc.) is logged and monitored",
)
credentials_scoped: bool = Field(
default=True,
description="Whether credentials have minimal scope (not admin/full-access)",
)
credentials_per_environment: bool = Field(
default=True,
description="Whether separate credentials are used per environment (dev/staging/prod)",
)
class AgentNetworkConfig(BaseModel):
"""Network & Communication Security configuration for an AI agent."""
uses_https_only: bool = Field(
default=True,
description="Whether all agent API calls use HTTPS / TLS 1.2+",
)
mtls_enforced: bool = Field(
default=False,
description="Whether mTLS is enforced in the service mesh for agent-to-agent communication",
)
api_calls_authenticated: bool = Field(
default=True,
description="Whether all internal API calls require authentication",
)
has_rate_limiting: bool = Field(
default=False,
description="Whether rate limiting is configured on agent API endpoints",
)
has_egress_filtering: bool = Field(
default=False,
description="Whether outbound network access is filtered by destination",
)
network_isolated: bool = Field(
default=False,
description="Whether the agent runs in an isolated network segment",
)
api_gateway_enforced: bool = Field(
default=True,
description="Whether all API access routes through an authenticated API Gateway",
)
validates_tls_certificates: bool = Field(
default=True,
description="Whether TLS certificates are fully validated (chain, hostname, expiry)",
)
network_calls_logged: bool = Field(
default=False,
description="Whether all network calls are logged with source agent ID and destination",
)
uses_dnssec: bool = Field(
default=False,
description="Whether DNS queries use DNSSEC / DoH / DoT",
)
validates_webhooks: bool = Field(
default=True,
description="Whether incoming webhooks/callbacks are signature-validated",
)
class AgentDataAccessConfig(BaseModel):
"""Data Access & Privacy configuration for an AI agent."""
accesses_pii: bool = Field(
default=False,
description="Whether the agent can access Personally Identifiable Information",
)
has_dlp_controls: bool = Field(
default=False,
description="Whether Data Loss Prevention controls are enforced on PII access",
)
data_encrypted_at_rest: bool = Field(
default=True,
description="Whether all data stores accessed by the agent use encryption at rest",
)
data_encrypted_in_transit: bool = Field(
default=True,
description="Whether all data in transit is encrypted (TLS)",
)
cross_boundary_data_flows_approved: bool = Field(
default=True,
description="Whether cross-boundary data flows are whitelisted and documented",
)
training_data_integrity_verified: bool = Field(
default=False,
description="Whether training data sources are validated with integrity checks",
)
data_retention_policy_days: Optional[int] = Field(
default=None,
description="Maximum data retention period in days (None = no policy)",
)
database_query_audit_enabled: bool = Field(
default=False,
description="Whether database queries from the agent are fully audited",
)
object_storage_access_logged: bool = Field(
default=False,
description="Whether object storage (S3/Blob/GCS) access is logged",
)
llm_context_sanitized: bool = Field(
default=False,
description="Whether sensitive data is stripped from LLM context windows",
)
has_model_card: bool = Field(
default=False,
description="Whether the agent's model has a documented model card",
)
output_validated_for_sensitive_data: bool = Field(
default=False,
description="Whether agent outputs are validated and redacted for sensitive data",
)
supports_data_subject_rights: bool = Field(
default=False,
description="Whether the system supports GDPR/CCPA data subject access requests",
)
class AgentRuntimeConfig(BaseModel):
"""Runtime & Sandbox Security configuration for an AI agent."""
runs_as_root: bool = Field(
default=False,
description="Whether the agent container/process runs as root",
)
privileged_container: bool = Field(
default=False,
description="Whether the agent runs in a privileged container",
)
has_seccomp_profile: bool = Field(
default=False,
description="Whether a seccomp profile is applied to the container",
)
has_apparmor_selinux: bool = Field(
default=False,
description="Whether AppArmor or SELinux policy is applied",
)
has_resource_limits: bool = Field(
default=False,
description="Whether CPU, memory, and disk limits are configured",
)
image_scanned_for_cves: bool = Field(
default=False,
description="Whether the container image is scanned for vulnerabilities before deployment",
)
has_runtime_monitoring: bool = Field(
default=False,
description="Whether runtime security monitoring (Falco, Sysdig, etc.) is enabled",
)
execution_environment_versioned: bool = Field(
default=False,
description="Whether the execution environment uses pinned base images and IaC",
)
secrets_cleared_from_memory: bool = Field(
default=False,
description="Whether sensitive data is cleared from memory after use",
)
has_execution_timeout: bool = Field(
default=False,
description="Whether execution time limits are enforced",
)
behavior_deterministic: bool = Field(
default=True,
description="Whether the agent produces deterministic, reproducible behaviour",
)
dependencies_integrity_checked: bool = Field(
default=False,
description="Whether runtime dependencies are verified via checksums/signatures",
)
uses_platform_security_controls: bool = Field(
default=False,
description="Whether platform-native controls (Pod Security Standards, Binary Authorization) are applied",
)
class AgentSupplyChainConfig(BaseModel):
"""Supply Chain Security configuration for an AI agent."""
framework_cves_scanned: bool = Field(
default=False,
description="Whether agent frameworks (LangChain, etc.) are scanned for CVEs",
)
llm_model_provenance_verified: bool = Field(
default=False,
description="Whether the LLM model has verified provenance (checksum, signed source)",
)
plugins_security_reviewed: bool = Field(
default=False,
description="Whether all agent plugins/tools have been security-reviewed",
)
dependencies_version_pinned: bool = Field(
default=False,
description="Whether all dependencies use exact pinned versions with lock files",
)
artifacts_signed: bool = Field(
default=False,
description="Whether container images and artifacts are cryptographically signed",
)
cicd_has_security_gates: bool = Field(
default=False,
description="Whether the CI/CD pipeline includes secret scanning, SAST, and dependency scanning",
)
licenses_compliant: bool = Field(
default=True,
description="Whether all model and library licenses are documented and compliant",
)
model_update_cadence_days: Optional[int] = Field(
default=None,
description="Maximum allowed days between security updates to the LLM model",
)
dependency_checksums_verified: bool = Field(
default=False,
description="Whether package checksums/signatures are verified on download",
)
class AgentObservabilityConfig(BaseModel):
"""Observability & Monitoring configuration for an AI agent."""
execution_logs_complete: bool = Field(
default=False,
description="Whether execution logs capture actions, tools, decisions, and outputs",
)
anomaly_detection_enabled: bool = Field(
default=False,
description="Whether anomaly detection monitors for unusual agent behaviour",
)
prompt_injection_monitoring: bool = Field(
default=False,
description="Whether LLM inputs are monitored for prompt injection / jailbreak attempts",
)
audit_logs_immutable: bool = Field(
default=False,
description="Whether audit logs are immutable and integrity-protected",
)
metrics_exported: bool = Field(
default=False,
description="Whether key metrics (latency, error rate, resource usage) are exported",
)
security_event_alerting: bool = Field(
default=False,
description="Whether security events trigger alerts within 5 minutes",
)
distributed_tracing_enabled: bool = Field(
default=False,
description="Whether W3C trace context is propagated across agent service calls",
)
centralized_dashboard: bool = Field(
default=False,
description="Whether a centralised dashboard shows agent security posture",
)
configuration_drift_tracked: bool = Field(
default=False,
description="Whether configuration changes are tracked and drift from baseline detected",
)
performance_baseline_defined: bool = Field(
default=False,
description="Whether a performance baseline exists and degradation triggers alerts",
)
class AgentComplianceConfig(BaseModel):
"""Compliance & Governance configuration for an AI agent."""
owasp_llm_top10_assessed: bool = Field(
default=False,
description="Whether the agent has been assessed against the OWASP LLM Top 10",
)
eu_ai_act_controls_present: bool = Field(
default=False,
description="Whether EU AI Act compliance controls are documented",
)
nist_ai_rmf_assessed: bool = Field(
default=False,
description="Whether the agent has been assessed against the NIST AI RMF",
)
access_control_policy_enforced: bool = Field(
default=False,
description="Whether a documented access control policy is enforced and audited",
)
dpia_completed: bool = Field(
default=False,
description="Whether a Data Privacy Impact Assessment has been completed",
)
regulatory_requirements_mapped: bool = Field(
default=False,
description="Whether applicable regulations (HIPAA, PCI-DSS, etc.) are mapped",
)
incident_response_plan_exists: bool = Field(
default=False,
description="Whether an agent-specific incident response plan exists and is tested",
)
third_party_vendors_assessed: bool = Field(
default=False,
description="Whether third-party agent vendors have been security-assessed (SOC 2, ISO 27001)",
)
user_consent_and_disclosure: bool = Field(
default=False,
description="Whether users are informed and consent to agent actions on their behalf",
)
class AgentAttackPathsConfig(BaseModel):
"""Attack Path Analysis configuration for an AI agent."""
cross_cloud_escalation_possible: bool = Field(
default=False,
description="Whether the agent can chain identities to escalate privileges across clouds",
)
tool_abuse_escalation_possible: bool = Field(
default=False,
description="Whether agent tools can be abused to exceed the agent's declared permissions",
)
sensitive_data_enables_downstream_compromise: bool = Field(
default=False,
description="Whether data accessible to the agent contains credentials or social-engineering material",
)
lateral_movement_via_shared_infra: bool = Field(
default=False,
description="Whether the agent can access sibling agent infrastructure / shared services",
)
compromise_enables_full_account_takeover: bool = Field(
default=False,
description="Whether a compromised agent credential chain could lead to full account takeover",
)
llm_output_used_in_code_execution: bool = Field(
default=False,
description="Whether LLM output is used directly in system calls or exec() without validation",
)
# ---------------------------------------------------------------------------
# Top-level Agent model
# ---------------------------------------------------------------------------
class AgentConfig(BaseModel):
"""Full security posture declaration for a single AI agent deployment."""
id: str = Field(description="Unique identifier for this agent deployment")
name: str = Field(description="Human-readable agent name")
environment: str = Field(
default="unknown",
description="Deployment environment: prod | staging | dev | unknown",
)
cloud_provider: str = Field(
default="unknown",
description="Primary cloud provider: aws | azure | gcp | unknown",
)
region: str = Field(default="global", description="Primary deployment region")
identity: AgentIdentityConfig = Field(default_factory=AgentIdentityConfig)
permissions: AgentPermissionsConfig = Field(default_factory=AgentPermissionsConfig)
credentials: AgentCredentialsConfig = Field(default_factory=AgentCredentialsConfig)
network: AgentNetworkConfig = Field(default_factory=AgentNetworkConfig)
data_access: AgentDataAccessConfig = Field(default_factory=AgentDataAccessConfig)
runtime: AgentRuntimeConfig = Field(default_factory=AgentRuntimeConfig)
supply_chain: AgentSupplyChainConfig = Field(default_factory=AgentSupplyChainConfig)
observability: AgentObservabilityConfig = Field(
default_factory=AgentObservabilityConfig
)
compliance: AgentComplianceConfig = Field(default_factory=AgentComplianceConfig)
attack_paths: AgentAttackPathsConfig = Field(default_factory=AgentAttackPathsConfig)
def dict(self, **kwargs):
"""Return a serialisable dict (used by Check_Report)."""
return super().model_dump(**kwargs)
# ---------------------------------------------------------------------------
# Output options
# ---------------------------------------------------------------------------
class ASPMOutputOptions(ProviderOutputOptions):
"""ASPM-specific output options."""
def __init__(self, arguments, bulk_checks_metadata):
super().__init__(arguments, bulk_checks_metadata)
if not getattr(arguments, "output_filename", None):
self.output_filename = f"prowler-output-aspm-{output_file_timestamp}"
else:
self.output_filename = arguments.output_filename
@@ -1,36 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_compromise_blast_radius_contained",
"CheckTitle": "AI Agent Compromise Blast Radius Must Be Contained — Full Account Takeover Must Not Be Possible",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "attack_paths",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Attack Path**. Evaluates whether compromising the AI agent's credential chain could allow an attacker to achieve full cloud account takeover. This occurs when the agent's permissions — either directly or through role chaining — reach account-administrative actions such as creating IAM principals, modifying SCPs, disabling audit trails, or deleting critical infrastructure. A well-scoped agent should have a contained blast radius limited strictly to its functional domain.",
"Risk": "An agent with an unbounded blast radius transforms every prompt injection, supply-chain attack, or runtime exploit targeting the agent into a potential full account takeover. A single compromised agent can be used to create backdoor IAM users, disable CloudTrail, exfiltrate all secrets, and destroy production resources — all under the agent's legitimate identity with no immediate IAM-level alert.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Apply a permission boundary to the agent's IAM role that prevents any account-administrative action (iam:CreateUser, iam:AttachRolePolicy, cloudtrail:StopLogging, organizations:*).\n2. Remove all attached admin or power-user managed policies and replace with a narrow functional policy.\n3. Enable AWS Service Control Policies (SCPs) or Azure Policy assignments that deny high-risk actions from agent identities.\n4. Conduct a blast-radius analysis using CIEM tooling (e.g., Wiz, Orca, Ermetic) and remediate all paths to account-level impact.\n5. Implement break-glass alerting: any attempt by the agent identity to call account-administrative APIs should page on-call immediately.",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply permission boundaries and SCPs to hard-cap the agent's maximum achievable permissions at the account level. Remove any direct or indirect path to account-administrative actions. Validate the blast radius using CIEM tooling after every permission change.",
"Url": "https://hub.prowler.com/check/aspm_agent_compromise_blast_radius_contained"
}
},
"Categories": [
"trust-boundaries",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-100 attack path check from the Prowler ASPM check suite."
}
@@ -1,43 +0,0 @@
"""ASPM-100: AI agent compromise blast radius must be contained — full account takeover must not be possible."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.attack_paths.attack_paths_client import (
attack_paths_client,
)
class aspm_agent_compromise_blast_radius_contained(Check):
"""Check that compromising the agent cannot lead to full cloud account takeover.
Blast radius refers to the maximum damage achievable if an agent is fully
compromised. When an agent's credential chain — through role chaining,
permission boundaries being absent, or admin-level API access allows an
attacker to reach account-level administrative actions (e.g., creating new
IAM users, disabling CloudTrail, deleting all resources), the blast radius
is effectively unbounded. This check verifies that such a path does not
exist.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in attack_paths_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if agent.attack_paths.compromise_enables_full_account_takeover:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} compromise could enable full cloud account "
"takeover — critical blast radius."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} compromise blast radius is contained — full "
"account takeover is not possible."
)
findings.append(report)
return findings
@@ -1,36 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_data_access_no_downstream_compromise",
"CheckTitle": "AI Agent Must Not Be Able to Access Data That Enables Downstream System or Human Compromise",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "attack_paths",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Attack Path**. Verifies that data stores accessible to the AI agent do not contain credentials, API keys, internal tokens, or social-engineering material (employee PII, org charts, internal process documentation). When an agent can read such material, an adversary who manipulates the agent's reasoning — via prompt injection or a poisoned knowledge base — can exfiltrate high-value secrets and use them to compromise downstream systems or people.",
"Risk": "An agent with access to credential-rich or socially-exploitable data becomes a one-stop reconnaissance tool. An attacker who achieves even read-level influence over the agent can harvest database passwords, internal API keys, employee contact details, and internal playbooks — enabling cascading compromise far beyond the agent's own cloud permissions.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Apply DLP (Data Loss Prevention) scanning to all data stores the agent reads to detect and block credentials and PII.\n2. Restrict the agent's read permissions to only the specific data objects it needs — avoid granting access to entire buckets, databases, or document libraries.\n3. Redact or tokenize sensitive fields (passwords, API keys, SSNs) before they are included in agent context windows or RAG retrievals.\n4. Implement output filtering to prevent the agent from returning raw credential strings or PII in its responses.\n5. Regularly audit the agent's accessible data scope using CSPM and CIEM tooling.",
"Terraform": ""
},
"Recommendation": {
"Text": "Scope the agent's data access to the minimum required objects and apply DLP controls to detect and block sensitive material from entering or leaving the agent's context. Redact credentials and PII at the data layer before they reach the agent.",
"Url": "https://hub.prowler.com/check/aspm_agent_data_access_no_downstream_compromise"
}
},
"Categories": [
"trust-boundaries",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-098 attack path check from the Prowler ASPM check suite."
}
@@ -1,42 +0,0 @@
"""ASPM-098: AI agent must not be able to access data that enables downstream compromise."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.attack_paths.attack_paths_client import (
attack_paths_client,
)
class aspm_agent_data_access_no_downstream_compromise(Check):
"""Check that AI agent-accessible data does not contain credentials or social-engineering material.
When an agent can read data stores containing credentials (database
connection strings, API keys, internal tokens) or material that can be
used for social engineering (employee PII, org-chart details, internal
process documentation), an attacker who manipulates the agent's reasoning
can harvest that material and use it to compromise downstream systems or
humans completely outside the agent's declared scope.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in attack_paths_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if agent.attack_paths.sensitive_data_enables_downstream_compromise:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} can access data containing credentials or "
"social-engineering material enabling downstream compromise."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} accessible data does not contain credentials "
"or social-engineering material."
)
findings.append(report)
return findings
@@ -1,36 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_llm_output_not_executed",
"CheckTitle": "AI Agent Must Not Execute LLM Output Directly in System Calls or eval()",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "attack_paths",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Attack Path**. Detects whether the AI agent passes raw LLM-generated text directly to code-execution primitives such as eval(), exec(), subprocess.run(shell=True), os.system(), or equivalent functions without sanitisation and structural allow-listing. When LLM output is executed verbatim, a successful prompt injection — delivered via a poisoned document, a malicious tool response, or an adversarial user prompt — becomes a Remote Code Execution (RCE) vulnerability running under the agent's cloud identity.",
"Risk": "Direct execution of LLM output converts every successful prompt injection into full Remote Code Execution on the agent host. An attacker who controls any input that reaches the LLM — retrieved document, user message, tool response — can instruct the model to emit a payload that the agent will execute verbatim. The resulting RCE runs under the agent's cloud identity, granting the attacker access to all cloud resources, secrets, and downstream systems the agent can reach.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Never pass raw LLM output to eval(), exec(), subprocess, or shell commands — treat all LLM output as untrusted user input.\n2. Use structured output formats (JSON with a strict schema) instead of free-form text when the agent must drive code execution.\n3. Validate and allow-list LLM-generated structured payloads against a predefined schema before any execution step.\n4. Route all code execution through a sandboxed execution environment (gVisor, Firecracker, WebAssembly) with no network access and scoped file system access.\n5. Implement prompt injection detection on all LLM inputs (user messages, retrieved documents, tool responses) and refuse execution when injection patterns are detected.",
"Terraform": ""
},
"Recommendation": {
"Text": "Treat LLM output as untrusted input. Never pass it directly to code-execution primitives. Require structured, schema-validated payloads for any execution step, run code in isolated sandboxes, and implement prompt injection detection to prevent adversarial payloads from reaching the execution layer.",
"Url": "https://hub.prowler.com/check/aspm_agent_llm_output_not_executed"
}
},
"Categories": [
"trust-boundaries",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-101 attack path check from the Prowler ASPM check suite."
}
@@ -1,42 +0,0 @@
"""ASPM-101: AI agent must not execute LLM output directly in system calls or eval()."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.attack_paths.attack_paths_client import (
attack_paths_client,
)
class aspm_agent_llm_output_not_executed(Check):
"""Check that AI agents do not execute LLM output directly in system calls or eval().
When an agent passes raw LLM-generated text to system calls, eval(),
exec(), subprocess.run(shell=True), or equivalent code-execution
primitives without sanitisation and allow-listing, an adversary who
controls any part of the LLM's prompt — through prompt injection, a
poisoned document, or a malicious tool response can achieve Remote Code
Execution (RCE) on the agent host with the agent's privileges.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in attack_paths_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if agent.attack_paths.llm_output_used_in_code_execution:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} uses LLM output directly in code execution — "
"prompt injection can achieve RCE."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} does not execute LLM output directly in "
"system calls or eval()."
)
findings.append(report)
return findings
@@ -1,36 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_no_cross_cloud_escalation",
"CheckTitle": "AI Agent Must Not Have a Cross-Cloud Identity Chain Enabling Privilege Escalation",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "attack_paths",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Attack Path**. Detects whether an AI agent's identity in one cloud provider can be used to assume a higher-privileged role in another cloud provider, creating a cross-cloud privilege escalation path. An attacker who compromises the agent can silently pivot between clouds, bypassing the least-privilege controls of each environment in isolation.",
"Risk": "A cross-cloud escalation path allows an attacker with initial access to the agent's lower-privileged identity in one cloud to obtain administrative or broad permissions in a second cloud. This renders the security boundaries of both clouds ineffective and can result in full multi-cloud environment compromise from a single initial foothold.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Audit all trust relationships and federation configurations between cloud identities used by this agent.\n2. Remove or tighten cross-cloud trust policies (e.g., AWS IAM OIDC provider conditions, GCP Workload Identity Pool conditions).\n3. Enforce the principle of least privilege independently in every cloud; do not allow a low-privilege identity in Cloud A to assume a high-privilege role in Cloud B.\n4. Implement explicit deny conditions on cross-cloud AssumeRole / impersonation calls that exceed the agent's declared permission scope.\n5. Alert on any new cross-cloud federation configuration changes via CSPM or CIEM tooling.",
"Terraform": ""
},
"Recommendation": {
"Text": "Remove cross-cloud identity trust relationships that allow the agent to escalate privileges beyond its declared scope. Apply equivalent least-privilege constraints to every cloud the agent interacts with, and monitor federation configuration changes continuously.",
"Url": "https://hub.prowler.com/check/aspm_agent_no_cross_cloud_escalation"
}
},
"Categories": [
"trust-boundaries",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-096 attack path check from the Prowler ASPM check suite."
}
@@ -1,41 +0,0 @@
"""ASPM-096: AI agent must not have a cross-cloud identity chain enabling privilege escalation."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.attack_paths.attack_paths_client import (
attack_paths_client,
)
class aspm_agent_no_cross_cloud_escalation(Check):
"""Check that AI agents cannot chain identities across cloud providers to escalate privileges.
Cross-cloud identity chaining occurs when an agent's identity in one cloud
can be used to assume a more privileged role in another cloud (e.g., an AWS
IAM role trusted by a GCP service account that has broader permissions).
This creates an attack path that bypasses the least-privilege controls of
either cloud in isolation.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in attack_paths_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if agent.attack_paths.cross_cloud_escalation_possible:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} can chain identities across cloud providers "
"to escalate privileges."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} has no cross-cloud identity chain enabling "
"lateral privilege escalation."
)
findings.append(report)
return findings
@@ -1,36 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_no_lateral_movement_via_shared_infra",
"CheckTitle": "AI Agent Must Not Be Able to Access Shared Infrastructure Used by Sibling Agents",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "attack_paths",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Attack Path**. Detects whether an AI agent can access infrastructure components — message queues, shared databases, configuration services, secret stores, or file systems — that are also used by sibling agents in the same deployment. Shared infrastructure creates a lateral movement path: compromise of one agent becomes a stepping stone to attacking all agents that share those resources.",
"Risk": "In multi-agent systems, shared infrastructure is the primary lateral movement surface. An attacker who compromises a lower-privilege agent can read messages, inject tasks, or overwrite configuration intended for a higher-privilege sibling agent, effectively taking over the entire agent fleet from a single initial breach point.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Assign each agent its own isolated message queue, database schema, and secret store — avoid any shared-resource pattern.\n2. Enforce network-level isolation between agent workloads using security groups, VPC subnet segregation, or Kubernetes NetworkPolicies.\n3. Use separate IAM identities per agent so that an identity compromise cannot access another agent's resources.\n4. Implement cross-agent communication via authenticated and signed API calls rather than shared data stores.\n5. Audit IAM policies and resource policies (SQS, S3, DynamoDB) to ensure no agent can access resources owned by sibling agents.",
"Terraform": ""
},
"Recommendation": {
"Text": "Eliminate shared infrastructure between agents. Provide each agent with dedicated, isolated resources and enforce network and IAM boundaries that prevent any cross-agent resource access. Use authenticated APIs for legitimate inter-agent communication.",
"Url": "https://hub.prowler.com/check/aspm_agent_no_lateral_movement_via_shared_infra"
}
},
"Categories": [
"trust-boundaries",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-099 attack path check from the Prowler ASPM check suite."
}
@@ -1,41 +0,0 @@
"""ASPM-099: AI agent must not be able to access shared infrastructure used by sibling agents."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.attack_paths.attack_paths_client import (
attack_paths_client,
)
class aspm_agent_no_lateral_movement_via_shared_infra(Check):
"""Check that AI agents cannot access sibling agent infrastructure or shared credential stores.
When multiple agents share infrastructure components message queues,
databases, secret stores, shared file systems, or configuration services
compromise of one agent can be used as a pivot to attack adjacent agents.
This lateral movement path is especially dangerous in multi-agent
orchestration systems where agent-to-agent trust is implicit.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in attack_paths_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if agent.attack_paths.lateral_movement_via_shared_infra:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} can access shared infrastructure used by "
"sibling agents — lateral movement risk."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} cannot access sibling agent infrastructure "
"or shared credential stores."
)
findings.append(report)
return findings
@@ -1,36 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_tools_cannot_escalate",
"CheckTitle": "AI Agent Tools Must Not Be Abusable to Escalate Beyond Declared Permissions",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "attack_paths",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Attack Path**. Verifies that the tools granted to an AI agent cannot be chained together or individually abused to perform actions beyond the agent's declared IAM/permission scope. Tool abuse includes prompt injection that redirects tool calls, SSRF through HTTP-fetching tools, insecure deserialization via file-parsing tools, and shell-escape vulnerabilities in code-execution sandboxes.",
"Risk": "When agent tools can be abused, an attacker who controls any part of the agent's input (prompt, retrieved document, API response) can coerce the agent into executing arbitrary commands, exfiltrating data, or invoking cloud APIs far beyond the agent's intended scope — all under the agent's legitimate identity and without triggering IAM-level alerts.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Enumerate every tool available to the agent and perform a tool-specific threat model (SSRF, shell injection, insecure deserialization).\n2. Apply input validation and allowlisting to all tool parameters before execution.\n3. Run code-execution tools in strict sandboxes (gVisor, Firecracker, seccomp profiles) with no network access unless explicitly required.\n4. Implement tool call signing and validation so the orchestration layer can detect tampered tool invocations.\n5. Log all tool invocations with full parameter payloads and alert on anomalous combinations.",
"Terraform": ""
},
"Recommendation": {
"Text": "Perform a threat model for every tool granted to the agent. Apply strict input validation, sandboxing, and allowlisted tool parameters. Log and alert on all tool invocations to detect abuse chains at runtime.",
"Url": "https://hub.prowler.com/check/aspm_agent_tools_cannot_escalate"
}
},
"Categories": [
"trust-boundaries",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-097 attack path check from the Prowler ASPM check suite."
}
@@ -1,42 +0,0 @@
"""ASPM-097: AI agent tools must not be abusable to escalate beyond the agent's declared permissions."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.attack_paths.attack_paths_client import (
attack_paths_client,
)
class aspm_agent_tools_cannot_escalate(Check):
"""Check that AI agent tools cannot be chained or abused to exceed the agent's declared permissions.
Agents are granted a set of tools (e.g., code-execution sandboxes, shell
utilities, file-system accessors, API wrappers). If those tools can be
composed or abused for example, through prompt injection, insecure
deserialization, or SSRF an attacker can achieve actions far beyond what
the agent's IAM policy technically allows. This check verifies that no
such tool-abuse escalation path has been identified.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in attack_paths_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if agent.attack_paths.tool_abuse_escalation_possible:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} tools can be chained or abused to escalate "
"beyond declared permissions."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} tools cannot be abused to exceed the agent's "
"declared permissions."
)
findings.append(report)
return findings
@@ -1,7 +0,0 @@
"""ASPM Attack Paths service client singleton."""
from prowler.providers.aspm.services.attack_paths.attack_paths_service import (
AttackPaths,
)
attack_paths_client = AttackPaths
@@ -1,20 +0,0 @@
"""ASPM Attack Paths service."""
from prowler.providers.aspm.aspm_provider import AspmProvider
from prowler.providers.aspm.lib.service.service import AspmService
class AttackPaths(AspmService):
"""Service for AI agent attack path analysis.
Inherits the agent list from AspmService and is used by all ASPM attack
path checks (ASPM-096 through ASPM-101).
"""
def __init__(self, provider: AspmProvider) -> None:
"""Initialise the AttackPaths service.
Args:
provider: The active AspmProvider instance.
"""
super().__init__(provider)
@@ -1,35 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_access_control_policy_enforced",
"CheckTitle": "AI agent must have a documented access control policy that is enforced and audited",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "compliance",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "high",
"ResourceType": "AiAgent",
"Description": "**AI Agent Compliance**. Verifies that each AI agent operates under a documented access control policy defining who or what may interact with the agent, what actions are permitted, under which conditions, and that the policy is technically enforced and access events are audited.",
"Risk": "An absent or unenforced access control policy means there is no defined boundary for agent interactions. Unauthorised principals may invoke agent capabilities, access sensitive data processed by the agent, or trigger actions with financial or operational consequences, all without detection.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Define a formal access control policy for the agent specifying permitted callers, allowed actions, and applicable conditions.\n2. Implement the policy using IAM roles, API gateway authorisers, or equivalent mechanisms.\n3. Enable audit logging for all access attempts (successful and denied) to an immutable log store.\n4. Review access logs regularly for anomalous patterns.\n5. Review and update the policy at least annually or on personnel/system changes.",
"Terraform": ""
},
"Recommendation": {
"Text": "Document, enforce, and audit an access control policy for each AI agent. Ensure the policy is technically enforced at the API or IAM layer and that access events are captured in an immutable audit log.",
"Url": "https://hub.prowler.com/check/aspm_agent_access_control_policy_enforced"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-090 check from the Prowler ASPM check suite."
}
@@ -1,40 +0,0 @@
"""ASPM-090: AI agent must have a documented access control policy that is enforced and audited."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.compliance.compliance_client import (
compliance_client,
)
class aspm_agent_access_control_policy_enforced(Check):
"""Check that each AI agent has an enforced and audited access control policy.
Access control policies define who or what may interact with an agent, what
actions are permitted, and under which conditions. Without enforcement and
audit trails, unauthorised access to agent capabilities or the data it
processes can go undetected.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in compliance_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if not agent.compliance.access_control_policy_enforced:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} lacks an enforced access control policy — "
f"governance controls are absent."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} has a documented access control policy that "
f"is enforced and audited."
)
findings.append(report)
return findings
@@ -1,35 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_dpia_completed",
"CheckTitle": "AI agent processing personal data must have a completed Data Privacy Impact Assessment",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "compliance",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "critical",
"ResourceType": "AiAgent",
"Description": "**AI Agent Compliance**. Verifies that each AI agent that processes personal data has a completed and documented Data Privacy Impact Assessment (DPIA), identifying privacy risks, documenting mitigations, and demonstrating compliance with GDPR Article 35, CCPA, and equivalent privacy regulations.",
"Risk": "Processing personal data without a DPIA exposes the organisation to GDPR enforcement action including fines of up to 4% of global annual turnover (or EUR 20M, whichever is higher). AI agents often process sensitive personal data as part of their normal operation, making DPIA completion a critical compliance obligation.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Determine whether the agent's processing activities meet the GDPR Article 35 threshold for mandatory DPIA (large-scale processing, systematic monitoring, sensitive categories).\n2. Conduct a DPIA using a structured methodology (e.g. CNIL PIA tool, ICO DPIA template).\n3. Document: processing purposes, necessity and proportionality, risks to data subjects, and mitigations.\n4. Consult with your Data Protection Officer (DPO) and, if required, the supervisory authority.\n5. Review and update the DPIA whenever the processing nature or risks change significantly.",
"Terraform": ""
},
"Recommendation": {
"Text": "Complete and document a DPIA for each AI agent that processes personal data before deployment. Involve your Data Protection Officer, document all identified risks and mitigations, and refresh the DPIA on significant changes.",
"Url": "https://hub.prowler.com/check/aspm_agent_dpia_completed"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-091 check from the Prowler ASPM check suite."
}
@@ -1,38 +0,0 @@
"""ASPM-091: AI agent must have a completed Data Privacy Impact Assessment (DPIA)."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.compliance.compliance_client import (
compliance_client,
)
class aspm_agent_dpia_completed(Check):
"""Check that each AI agent has a completed Data Privacy Impact Assessment.
A DPIA is mandatory under GDPR Article 35 for high-risk processing activities
and strongly recommended under CCPA and other privacy regulations. It identifies
privacy risks, documents mitigations, and demonstrates accountability to
supervisory authorities. Failure to complete a DPIA exposes organisations to
significant regulatory fines and reputational damage.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in compliance_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if not agent.compliance.dpia_completed:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} processes personal data without a DPIA — "
f"GDPR/CCPA compliance risk."
)
else:
report.status = "PASS"
report.status_extended = f"Agent {agent.name} has a completed Data Privacy Impact Assessment."
findings.append(report)
return findings
@@ -1,35 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_eu_ai_act_controls",
"CheckTitle": "AI agent deployed in the EU must have documented EU AI Act compliance controls",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "compliance",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "high",
"ResourceType": "AiAgent",
"Description": "**AI Agent Compliance**. Verifies that each AI agent has documented compliance controls aligned with the EU AI Act, including transparency obligations, human oversight mechanisms, risk management systems, and conformity assessments where required for high-risk AI systems.",
"Risk": "The EU AI Act imposes binding obligations on providers and deployers of AI systems in the EU market. Non-compliance with requirements for high-risk AI systems can result in fines of up to 3% of global annual turnover (or EUR 15M, whichever is higher), enforcement action, and mandatory market withdrawal.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Classify the AI agent under the EU AI Act risk tiers (unacceptable, high, limited, minimal).\n2. For high-risk systems, implement conformity assessment procedures per Annex VI or VII.\n3. Document transparency obligations: inform users they are interacting with an AI system.\n4. Implement human oversight mechanisms allowing intervention and override.\n5. Establish a risk management system covering the entire AI lifecycle.\n6. Maintain technical documentation as required by Article 11 and Annex IV.",
"Terraform": ""
},
"Recommendation": {
"Text": "Classify each AI agent under the EU AI Act, implement required controls for the applicable risk tier, and maintain documentation demonstrating conformity. Engage a qualified legal and technical team to perform a gap analysis.",
"Url": "https://hub.prowler.com/check/aspm_agent_eu_ai_act_controls"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-088 check from the Prowler ASPM check suite."
}
@@ -1,39 +0,0 @@
"""ASPM-088: AI agent must have EU AI Act compliance controls documented."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.compliance.compliance_client import (
compliance_client,
)
class aspm_agent_eu_ai_act_controls(Check):
"""Check that each AI agent has EU AI Act compliance controls documented.
The EU AI Act imposes requirements on high-risk AI systems deployed in the
European Union, including transparency obligations, human oversight mechanisms,
and risk management systems. Agents without documented controls face regulatory
penalties and potential enforcement action.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in compliance_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if not agent.compliance.eu_ai_act_controls_present:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} lacks EU AI Act compliance controls — "
f"regulatory risk for EU-deployed agents."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} has EU AI Act compliance controls documented."
)
findings.append(report)
return findings
@@ -1,35 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_incident_response_plan_exists",
"CheckTitle": "AI agent must have a tested, agent-specific incident response plan",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "compliance",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "high",
"ResourceType": "AiAgent",
"Description": "**AI Agent Compliance**. Verifies that each AI agent has a documented and tested incident response plan covering agent-specific failure modes such as prompt injection attacks, model misbehaviour, autonomous action errors, and credential compromise, including containment, eradication, and recovery procedures.",
"Risk": "Generic IT incident response plans rarely cover AI-specific failure modes. Without an agent-specific plan, teams facing a prompt injection attack or autonomous action incident have no playbook to follow, leading to delayed containment, extended impact, and potentially irreversible consequences from autonomous agent actions.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Identify AI-specific incident scenarios: prompt injection, model hallucination causing harm, autonomous action abuse, data exfiltration via LLM output, credential compromise.\n2. For each scenario, document detection signals, containment steps (including agent shutdown procedures), eradication, and recovery.\n3. Define roles and responsibilities, escalation paths, and communication templates.\n4. Integrate the plan with the organisation's overarching incident response procedure.\n5. Conduct tabletop exercises at least annually to validate the plan.\n6. Update the plan after every significant incident or architectural change.",
"Terraform": ""
},
"Recommendation": {
"Text": "Develop and test an agent-specific incident response plan covering AI failure modes. Conduct annual tabletop exercises and update the plan after incidents or significant architectural changes.",
"Url": "https://hub.prowler.com/check/aspm_agent_incident_response_plan_exists"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-093 check from the Prowler ASPM check suite."
}
@@ -1,37 +0,0 @@
"""ASPM-093: AI agent must have a tested, agent-specific incident response plan."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.compliance.compliance_client import (
compliance_client,
)
class aspm_agent_incident_response_plan_exists(Check):
"""Check that each AI agent has a tested, agent-specific incident response plan.
AI agents introduce novel failure modes including prompt injection, model
misbehaviour, and autonomous action errors that may not be covered by generic
IT incident response plans. An agent-specific plan ensures teams know how to
contain, eradicate, and recover from agent-related incidents swiftly.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in compliance_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if not agent.compliance.incident_response_plan_exists:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} has no incident response plan — the organisation "
f"is unprepared for agent-related incidents."
)
else:
report.status = "PASS"
report.status_extended = f"Agent {agent.name} has a tested, agent-specific incident response plan."
findings.append(report)
return findings
@@ -1,35 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_nist_ai_rmf_assessed",
"CheckTitle": "AI agent must be assessed against the NIST AI Risk Management Framework",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "compliance",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "high",
"ResourceType": "AiAgent",
"Description": "**AI Agent Compliance**. Verifies that each AI agent has been assessed against the NIST AI Risk Management Framework (AI RMF 1.0), covering the GOVERN, MAP, MEASURE, and MANAGE functions, to ensure structured risk identification, measurement, and mitigation across the AI lifecycle.",
"Risk": "Without an NIST AI RMF assessment, organisations lack a structured approach to identifying and managing AI-specific risks including bias, robustness failures, and security vulnerabilities. This can lead to undiscovered risks becoming operational incidents, particularly for agents in sensitive or regulated environments.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Obtain the NIST AI RMF 1.0 publication from nist.gov/artificial-intelligence.\n2. Perform a structured assessment covering GOVERN (policies), MAP (context and risk identification), MEASURE (risk analysis), and MANAGE (risk treatment).\n3. Document outcomes and assign risk owners for each identified AI risk.\n4. Integrate AI RMF assessment into the agent's development and deployment lifecycle.\n5. Schedule periodic re-assessment (at least annually or on major model changes).",
"Terraform": ""
},
"Recommendation": {
"Text": "Conduct and document an NIST AI RMF assessment for each AI agent, covering all four core functions. Use the assessment outputs to drive risk treatment decisions and governance improvements.",
"Url": "https://hub.prowler.com/check/aspm_agent_nist_ai_rmf_assessed"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-089 check from the Prowler ASPM check suite."
}
@@ -1,39 +0,0 @@
"""ASPM-089: AI agent must be assessed against the NIST AI Risk Management Framework."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.compliance.compliance_client import (
compliance_client,
)
class aspm_agent_nist_ai_rmf_assessed(Check):
"""Check that each AI agent has been assessed against the NIST AI RMF.
The NIST AI Risk Management Framework (AI RMF 1.0) provides guidance to
organisations for managing risks associated with AI systems across the
entire AI lifecycle. Assessment against the framework demonstrates that
governance, mapping, measurement, and management functions are in place.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in compliance_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if not agent.compliance.nist_ai_rmf_assessed:
report.status = "FAIL"
report.status_extended = (
f"Agent {agent.name} has not been assessed against the NIST AI RMF."
)
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} has been assessed against the NIST AI Risk "
f"Management Framework."
)
findings.append(report)
return findings
@@ -1,35 +0,0 @@
{
"Provider": "aspm",
"CheckID": "aspm_agent_owasp_llm_top10_assessed",
"CheckTitle": "AI agent must be assessed against the OWASP LLM Top 10 with documented mitigations",
"CheckType": [
"Software and Configuration Checks/AI Agent Security Best Practices"
],
"ServiceName": "compliance",
"SubServiceName": "",
"ResourceIdTemplate": "aspm-agent-{id}",
"Severity": "high",
"ResourceType": "AiAgent",
"Description": "**AI Agent Compliance**. Verifies that each AI agent has been formally assessed against the OWASP Top 10 for Large Language Model Applications and that mitigations are documented for all applicable risks, including prompt injection, insecure output handling, training data poisoning, and model denial of service.",
"Risk": "Without an OWASP LLM Top 10 assessment, common and well-understood attack vectors against AI agents remain unaddressed. Prompt injection and insecure output handling are regularly exploited in the wild and can lead to data exfiltration, privilege escalation, or complete agent compromise.",
"RelatedUrl": "https://owasp.org/www-project-top-10-for-large-language-model-applications/",
"AdditionalURLs": [],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Download the OWASP Top 10 for LLM Applications from owasp.org.\n2. Conduct a structured assessment of each LLM-powered agent against all 10 categories.\n3. Document findings and applicable risks in the agent's security design document.\n4. Implement mitigations (input validation, output encoding, rate limiting, etc.) for each applicable risk.\n5. Schedule re-assessment annually or on significant model or architecture changes.",
"Terraform": ""
},
"Recommendation": {
"Text": "Perform and document an OWASP LLM Top 10 assessment for each AI agent, including risk ratings and implemented mitigations. Integrate assessment checkpoints into the AI agent development lifecycle.",
"Url": "https://hub.prowler.com/check/aspm_agent_owasp_llm_top10_assessed"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ASPM-087 check from the Prowler ASPM check suite."
}
@@ -1,37 +0,0 @@
"""ASPM-087: AI agent must be assessed against the OWASP LLM Top 10."""
from prowler.lib.check.models import Check, CheckReportASPM
from prowler.providers.aspm.services.compliance.compliance_client import (
compliance_client,
)
class aspm_agent_owasp_llm_top10_assessed(Check):
"""Check that each AI agent has been assessed against the OWASP LLM Top 10.
The OWASP LLM Top 10 identifies the most critical security risks for
applications using large language models. An assessment ensures that common
attack vectors such as prompt injection, insecure output handling, and
training data poisoning are actively mitigated.
"""
def execute(self) -> list[CheckReportASPM]:
"""Execute the check against all loaded agents.
Returns:
A list of CheckReportASPM findings, one per agent.
"""
findings = []
for agent in compliance_client.agents:
report = CheckReportASPM(metadata=self.metadata(), resource=agent)
if not agent.compliance.owasp_llm_top10_assessed:
report.status = "FAIL"
report.status_extended = f"Agent {agent.name} has not been assessed against the OWASP LLM Top 10."
else:
report.status = "PASS"
report.status_extended = (
f"Agent {agent.name} has been assessed against the OWASP LLM Top 10 "
f"with documented mitigations."
)
findings.append(report)
return findings

Some files were not shown because too many files have changed in this diff Show More