Compare commits

..

26 Commits

Author SHA1 Message Date
Prowler Bot a70f0652b6 fix(ui): hide line numbers in CLI command remediation block (#11061)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-06 15:06:00 +01:00
Prowler Bot fae4fbc0ae fix: PR number in changelog entry for #10529 (#11058)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-06 11:56:21 +01:00
Prowler Bot bbe45ed708 fix(oci): scan identity in known valid region (#11056)
Co-authored-by: rchotacode <32524742+rchotacode@users.noreply.github.com>
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-06 11:44:26 +01:00
Prowler Bot 6b6d22bb31 chore(api): Bump version to v1.26.3 (#10996)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:45:57 +02:00
Prowler Bot a3b4f94368 chore(sdk): Bump version to v5.25.3 (#10994)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:45:35 +02:00
Prowler Bot 178cdb1b57 chore(ui): Bump version to v5.25.3 (#10995)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:44:56 +02:00
Prowler Bot d58343e11f chore(changelog): prepare for v5.25.2 (#10992)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-05 08:51:50 +02:00
Prowler Bot 952ca2d505 fix(sdk): cover CNAME → dangling S3 in route53 takeover check (#10990)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-05 08:47:00 +02:00
Prowler Bot 9de9a26821 fix(k8s): match RBAC rules by apiGroup, not just core (#10988)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 19:59:31 +02:00
Prowler Bot e4da9741b2 fix(timeline): Return a compact actor name from CloudTrail events (#10987)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-04 19:41:36 +02:00
Prowler Bot 35e867e4f5 fix(k8s): deduplicate RBAC findings by unique subject (#10984)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-04 18:22:56 +02:00
Prowler Bot 0719f69828 fix(ui): compliance card layout polish (#10977)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-04 15:53:58 +01:00
Prowler Bot b7ee0ce9b1 fix(ui): clean up findings expanded resource row layout (#10973)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-04 14:59:06 +01:00
Prowler Bot 53f6cb52cb chore(ui): Bump version to v5.25.2 (#10941)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:39 +02:00
Prowler Bot 429c5f6789 chore(sdk): Bump version to v5.25.2 (#10943)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:30 +02:00
Prowler Bot 592bc4a944 chore(api): Bump version to v1.26.2 (#10942)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:19 +02:00
Prowler Bot bfdacf3f25 fix(ui): reposition compliance card export menu (#10933) 2026-04-29 14:18:07 +02:00
Prowler Bot adc1dbfe7c chore: changelog v5.25.1 (#10935)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-04-29 14:03:15 +02:00
Prowler Bot 1b8b5cd18c fix(kubernetes): use cluster name as provider_uid in OCSF output (#10932)
Co-authored-by: Andoni Alonso <14891798+andoniaf@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-04-29 13:52:53 +02:00
Prowler Bot be94b97e49 fix(api): redirect scan report and compliance downloads to presigned S3 URLs (#10931)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-04-29 13:34:04 +02:00
Prowler Bot 9840fa640b fix(api): Attack Paths AWS region fallback and stale SCHEDULED cleanup (#10930)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-04-29 13:14:53 +02:00
Prowler Bot 0aa7b84be3 fix(cli): generate compliance after scan (#10922)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-04-28 17:26:34 +02:00
Prowler Bot bfa8e811d1 chore(ui): Bump version to v5.25.1 (#10914)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:35:41 +02:00
Prowler Bot 1c29521ebd chore(sdk): Bump version to v5.25.1 (#10911)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:35:04 +02:00
Prowler Bot b5abea3e45 chore(api): Bump version to v1.26.1 (#10915)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 12:34:52 +02:00
Prowler Bot 68eb946326 chore(api): Update prowler dependency to v5.25 for release 5.25.0 (#10906)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-28 11:00:51 +02:00
229 changed files with 3295 additions and 15555 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.26.0
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.25.3
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
@@ -1,143 +0,0 @@
name: "🔎 New Check Request"
description: Request a new Prowler security check
title: "[New Check]: "
labels: ["feature-request", "status/needs-triage"]
body:
- type: checkboxes
id: search
attributes:
label: Existing check search
description: Confirm this check does not already exist before opening a new request.
options:
- label: I have searched existing issues, Prowler Hub, and the public roadmap, and this check does not already exist.
required: true
- type: markdown
attributes:
value: |
Use this form to describe the security condition that Prowler should evaluate.
The most useful inputs for [Prowler Studio](https://github.com/prowler-cloud/prowler-studio) are:
- What should be detected
- What PASS and FAIL mean
- Vendor docs, API references, SDK methods, CLI commands, or reference code
- type: dropdown
id: provider
attributes:
label: Provider
description: Cloud or platform this check targets.
options:
- AWS
- Azure
- GCP
- Kubernetes
- GitHub
- Microsoft 365
- OCI
- Alibaba Cloud
- Cloudflare
- MongoDB Atlas
- Google Workspace
- OpenStack
- Vercel
- NHN
- Other / New provider
validations:
required: true
- type: input
id: other_provider_name
attributes:
label: New provider name
description: Only fill this if you selected "Other / New provider" above.
placeholder: "NewProviderName"
validations:
required: false
- type: input
id: service_name
attributes:
label: Service or product area
description: Optional. Main service, product, or feature to audit.
placeholder: "s3, bedrock, entra, repository, apiserver"
validations:
required: false
- type: input
id: suggested_check_name
attributes:
label: Suggested check name
description: Optional. Use `snake_case` following `<service>_<resource>_<best_practice>`, with lowercase letters and underscores only.
placeholder: "bedrock_guardrail_sensitive_information_filter_enabled"
validations:
required: false
- type: textarea
id: context
attributes:
label: Context and goal
description: Describe the security problem, why it matters, and what this new check should help detect.
placeholder: |-
- Security condition to validate:
- Why it matters:
- Resource, feature, or configuration involved:
validations:
required: true
- type: textarea
id: expected_behavior
attributes:
label: Expected behavior
description: Explain what the check should evaluate and what PASS, FAIL, or MANUAL should mean.
placeholder: |-
- Resource or scope to evaluate:
- PASS when:
- FAIL when:
- MANUAL when (if applicable):
- Exclusions, thresholds, or edge cases:
validations:
required: true
- type: textarea
id: references
attributes:
label: References
description: Add vendor docs, API references, SDK methods, CLI commands, endpoint docs, sample payloads, or similar reference material.
placeholder: |-
- Product or service documentation:
- API or SDK reference:
- CLI command or endpoint documentation:
- Sample payload or response:
- Security advisory or benchmark:
validations:
required: true
- type: dropdown
id: severity
attributes:
label: Suggested severity
description: Your best estimate. Reviewers will confirm during triage.
options:
- Critical
- High
- Medium
- Low
- Informational
- Not sure
validations:
required: true
- type: textarea
id: implementation_notes
attributes:
label: Additional implementation notes
description: Optional. Add permissions, unsupported regions, config knobs, product limitations, or anything else that may affect implementation.
placeholder: |-
- Required permissions or scopes:
- Region, tenant, or subscription limitations:
- Configurable behavior or thresholds:
- Other constraints:
validations:
required: false
+1 -1
View File
@@ -62,7 +62,7 @@ jobs:
"Alan-TheGentleman"
"alejandrobailo"
"amitsharm"
# "andoniaf"
"andoniaf"
"cesararroba"
"danibarranqueroo"
"HugoPBrito"
+2 -2
View File
@@ -209,11 +209,11 @@ jobs:
echo "AWS service_paths='${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}'"
if [ "${STEPS_AWS_SERVICES_OUTPUTS_RUN_ALL}" = "true" ]; then
poetry run pytest -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml tests/providers/aws
poetry run pytest -p no:randomly -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml tests/providers/aws
elif [ -z "${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}" ]; then
echo "No AWS service paths detected; skipping AWS tests."
else
poetry run pytest -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml ${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}
poetry run pytest -p no:randomly -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml ${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}
fi
env:
STEPS_AWS_SERVICES_OUTPUTS_RUN_ALL: ${{ steps.aws-services.outputs.run_all }}
-20
View File
@@ -42,8 +42,6 @@ jobs:
fonts.gstatic.com:443
api.github.com:443
release-assets.githubusercontent.com:443
cdn.playwright.dev:443
objects.githubusercontent.com:443
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -154,24 +152,6 @@ jobs:
echo "Only test files changed - running ALL unit tests"
pnpm run test:run
- name: Cache Playwright browsers
if: steps.check-changes.outputs.any_changed == 'true'
id: playwright-cache
uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: ~/.cache/ms-playwright
key: ${{ runner.os }}-playwright-chromium-${{ hashFiles('ui/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-playwright-chromium-
- name: Install Playwright Chromium browser
if: steps.check-changes.outputs.any_changed == 'true' && steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install chromium
- name: Run browser tests
if: steps.check-changes.outputs.any_changed == 'true'
run: pnpm run test:browser
- name: Build application
if: steps.check-changes.outputs.any_changed == 'true'
run: pnpm run build
-36
View File
@@ -1,34 +1,17 @@
# Priority tiers (lower = runs first, same priority = concurrent):
# P0 — fast file fixers
# P10 — validators and guards
# P20 — auto-formatters
# P30 — linters
# P40 — security scanners
# P50 — dependency validation
default_install_hook_types: [pre-commit, pre-push]
repos:
## GENERAL (prek built-in — no external repo needed)
- repo: builtin
hooks:
- id: check-merge-conflict
priority: 10
- id: check-yaml
args: ["--allow-multiple-documents"]
exclude: (prowler/config/llm_config.yaml|contrib/)
priority: 10
- id: check-json
priority: 10
- id: end-of-file-fixer
priority: 0
- id: trailing-whitespace
priority: 0
- id: no-commit-to-branch
priority: 10
- id: pretty-format-json
args: ["--autofix", --no-sort-keys, --no-ensure-ascii]
priority: 10
## TOML
- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks
@@ -37,7 +20,6 @@ repos:
- id: pretty-format-toml
args: [--autofix]
files: pyproject.toml
priority: 20
## GITHUB ACTIONS
- repo: https://github.com/zizmorcore/zizmor-pre-commit
@@ -45,7 +27,6 @@ repos:
hooks:
- id: zizmor
files: ^\.github/
priority: 30
## BASH
- repo: https://github.com/koalaman/shellcheck-precommit
@@ -53,7 +34,6 @@ repos:
hooks:
- id: shellcheck
exclude: contrib
priority: 30
## PYTHON — SDK (prowler/, tests/, dashboard/, util/, scripts/)
- repo: https://github.com/myint/autoflake
@@ -68,7 +48,6 @@ repos:
"--remove-all-unused-imports",
"--remove-unused-variable",
]
priority: 20
- repo: https://github.com/pycqa/isort
rev: 8.0.1
@@ -77,7 +56,6 @@ repos:
name: "SDK - isort"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
args: ["--profile", "black"]
priority: 20
- repo: https://github.com/psf/black
rev: 26.3.1
@@ -85,7 +63,6 @@ repos:
- id: black
name: "SDK - black"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
priority: 20
- repo: https://github.com/pycqa/flake8
rev: 7.3.0
@@ -94,7 +71,6 @@ repos:
name: "SDK - flake8"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
args: ["--ignore=E266,W503,E203,E501,W605"]
priority: 30
## PYTHON — API + MCP Server (ruff)
- repo: https://github.com/astral-sh/ruff-pre-commit
@@ -104,11 +80,9 @@ repos:
name: "API + MCP - ruff check"
files: { glob: ["{api,mcp_server}/**/*.py"] }
args: ["--fix"]
priority: 30
- id: ruff-format
name: "API + MCP - ruff format"
files: { glob: ["{api,mcp_server}/**/*.py"] }
priority: 20
## PYTHON — Poetry
- repo: https://github.com/python-poetry/poetry
@@ -119,28 +93,24 @@ repos:
args: ["--directory=./api"]
files: { glob: ["api/{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
- id: poetry-lock
name: API - poetry-lock
args: ["--directory=./api"]
files: { glob: ["api/{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
- id: poetry-check
name: SDK - poetry-check
args: ["--directory=./"]
files: { glob: ["{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
- id: poetry-lock
name: SDK - poetry-lock
args: ["--directory=./"]
files: { glob: ["{pyproject.toml,poetry.lock}"] }
pass_filenames: false
priority: 50
## CONTAINERS
- repo: https://github.com/hadolint/hadolint
@@ -148,7 +118,6 @@ repos:
hooks:
- id: hadolint
args: ["--ignore=DL3013"]
priority: 30
## LOCAL HOOKS
- repo: local
@@ -159,7 +128,6 @@ repos:
language: system
types: [python]
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
priority: 30
- id: trufflehog
name: TruffleHog
@@ -170,7 +138,6 @@ repos:
language: system
pass_filenames: false
stages: ["pre-commit", "pre-push"]
priority: 40
- id: bandit
name: bandit
@@ -181,7 +148,6 @@ repos:
files: '.*\.py'
exclude:
{ glob: ["{contrib,skills}/**", "**/.venv/**", "**/*_test.py"] }
priority: 40
- id: safety
name: safety
@@ -200,7 +166,6 @@ repos:
".safety-policy.yml",
],
}
priority: 40
- id: vulture
name: vulture
@@ -209,4 +174,3 @@ repos:
language: system
types: [python]
files: '.*\.py'
priority: 40
+12 -12
View File
@@ -104,22 +104,22 @@ Every AWS provider scan will enqueue an Attack Paths ingestion job automatically
| Provider | Checks | Services | [Compliance Frameworks](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/compliance/) | [Categories](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/misc/#categories) | Support | Interface |
|---|---|---|---|---|---|---|
| AWS | 595 | 84 | 43 | 17 | Official | UI, API, CLI |
| Azure | 167 | 22 | 19 | 16 | Official | UI, API, CLI |
| GCP | 102 | 18 | 17 | 12 | Official | UI, API, CLI |
| Kubernetes | 83 | 7 | 7 | 11 | Official | UI, API, CLI |
| GitHub | 24 | 3 | 1 | 5 | Official | UI, API, CLI |
| M365 | 101 | 10 | 4 | 10 | Official | UI, API, CLI |
| OCI | 51 | 14 | 4 | 10 | Official | UI, API, CLI |
| Alibaba Cloud | 61 | 9 | 4 | 9 | Official | UI, API, CLI |
| Cloudflare | 29 | 3 | 0 | 5 | Official | UI, API, CLI |
| AWS | 572 | 83 | 41 | 17 | Official | UI, API, CLI |
| Azure | 165 | 20 | 18 | 13 | Official | UI, API, CLI |
| GCP | 100 | 13 | 15 | 11 | Official | UI, API, CLI |
| Kubernetes | 83 | 7 | 7 | 9 | Official | UI, API, CLI |
| GitHub | 21 | 2 | 1 | 2 | Official | UI, API, CLI |
| M365 | 89 | 9 | 4 | 5 | Official | UI, API, CLI |
| OCI | 48 | 13 | 3 | 10 | Official | UI, API, CLI |
| Alibaba Cloud | 61 | 9 | 3 | 9 | Official | UI, API, CLI |
| Cloudflare | 29 | 2 | 0 | 5 | Official | UI, API, CLI |
| IaC | [See `trivy` docs.](https://trivy.dev/latest/docs/coverage/iac/) | N/A | N/A | N/A | Official | UI, API, CLI |
| MongoDB Atlas | 10 | 3 | 0 | 8 | Official | UI, API, CLI |
| LLM | [See `promptfoo` docs.](https://www.promptfoo.dev/docs/red-team/plugins/) | N/A | N/A | N/A | Official | CLI |
| Image | N/A | N/A | N/A | N/A | Official | CLI, API |
| Google Workspace | 25 | 4 | 2 | 4 | Official | CLI |
| OpenStack | 34 | 5 | 0 | 9 | Official | UI, API, CLI |
| Vercel | 26 | 6 | 0 | 5 | Official | CLI |
| Google Workspace | 1 | 1 | 0 | 1 | Official | CLI |
| OpenStack | 27 | 4 | 0 | 8 | Official | UI, API, CLI |
| Vercel | 30 | 6 | 0 | 5 | Official | CLI |
| NHN | 6 | 2 | 1 | 0 | Unofficial | CLI |
> [!Note]
-8
View File
@@ -2,14 +2,6 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.27.0] (Prowler UNRELEASED)
### 🚀 Added
- `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
---
## [1.26.1] (Prowler v5.25.1)
### 🐞 Fixed
+3 -3
View File
@@ -6754,8 +6754,8 @@ uuid6 = "2024.7.10"
[package.source]
type = "git"
url = "https://github.com/prowler-cloud/prowler.git"
reference = "master"
resolved_reference = "ca29e354b622198ff6a70e2ea5eb04e4a44a0903"
reference = "v5.25"
resolved_reference = "e252058af491b41608dbaaba2975acd7c1728174"
[[package]]
name = "psutil"
@@ -9424,4 +9424,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = ">=3.11,<3.13"
content-hash = "a3ab982d11a87d951ff15694d2ca7fd51f1f51a451abb0baa067ccf6966367a8"
content-hash = "7446e89a46709f976a572231862072de86e7bf01ed90a72bea526b9ab05a82b3"
+2 -2
View File
@@ -25,7 +25,7 @@ dependencies = [
"defusedxml==0.7.1",
"gunicorn==23.0.0",
"lxml==5.3.2",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@v5.25",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (==1.3.0)",
"sentry-sdk[django] (==2.56.0)",
@@ -50,7 +50,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.27.0"
version = "1.26.3"
[project.scripts]
celery = "src.backend.config.settings.celery"
-48
View File
@@ -595,40 +595,10 @@ class Scan(RowLevelSecurityProtectedModel):
objects = ActiveProviderManager()
all_objects = models.Manager()
_SCOPING_SCANNER_ARG_KEYS_CACHE: tuple[str, ...] | None = None
@classmethod
def get_scoping_scanner_arg_keys(cls) -> tuple[str, ...]:
"""Return the scanner_args keys that mark a scan as scoped.
Derived from ``prowler.lib.scan.scan.Scan.__init__`` so the API stays
in sync with whatever the SDK actually accepts as filters. Cached at
class level — the signature is stable for the process lifetime.
"""
if cls._SCOPING_SCANNER_ARG_KEYS_CACHE is None:
import inspect
from prowler.lib.scan.scan import Scan as ProwlerScan
params = inspect.signature(ProwlerScan.__init__).parameters
cls._SCOPING_SCANNER_ARG_KEYS_CACHE = tuple(
name for name in params if name not in ("self", "provider")
)
return cls._SCOPING_SCANNER_ARG_KEYS_CACHE
class TriggerChoices(models.TextChoices):
SCHEDULED = "scheduled", _("Scheduled")
MANUAL = "manual", _("Manual")
# Trigger values for scans that ran the SDK end-to-end. Imported scans (or
# any future trigger) are intentionally NOT in this set — they may carry
# only a partial slice of resources, so post-scan logic that depends on a
# full-scope sweep (e.g. resetting ephemeral resource findings) must skip
# them by default.
LIVE_SCAN_TRIGGERS = frozenset(
(TriggerChoices.SCHEDULED.value, TriggerChoices.MANUAL.value)
)
id = models.UUIDField(primary_key=True, default=uuid7, editable=False)
name = models.CharField(
blank=True, null=True, max_length=100, validators=[MinLengthValidator(3)]
@@ -711,24 +681,6 @@ class Scan(RowLevelSecurityProtectedModel):
class JSONAPIMeta:
resource_name = "scans"
def is_full_scope(self) -> bool:
"""Return True if this scan ran with no scoping filters at all.
Used to gate post-scan operations (such as resetting the
failed_findings_count of resources missing from the scan) that are only
safe when the scan covered every check, service, and category. Imported
scans are NOT full-scope by definition — they may carry only a partial
slice of resources, so they're rejected via ``trigger`` even before the
scanner_args check.
"""
if self.trigger not in self.LIVE_SCAN_TRIGGERS:
return False
scanner_args = self.scanner_args or {}
for key in self.get_scoping_scanner_arg_keys():
if scanner_args.get(key):
return False
return True
class AttackPathsScan(RowLevelSecurityProtectedModel):
objects = ActiveProviderManager()
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.27.0
version: 1.26.3
description: |-
Prowler API specification.
+1 -1
View File
@@ -422,7 +422,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.27.0"
spectacular_settings.VERSION = "1.26.3"
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)
+2 -181
View File
@@ -10,29 +10,16 @@ from typing import Any
import sentry_sdk
from celery.utils.log import get_task_logger
from config.django.base import DJANGO_FINDINGS_BATCH_SIZE
from config.env import env
from config.settings.celery import CELERY_DEADLOCK_ATTEMPTS
from django.db import IntegrityError, OperationalError
from django.db.models import (
Case,
Count,
Exists,
IntegerField,
Max,
Min,
OuterRef,
Prefetch,
Q,
Sum,
When,
)
from django.db.models import Case, Count, IntegerField, Max, Min, Prefetch, Q, Sum, When
from django.utils import timezone as django_timezone
from tasks.jobs.queries import (
COMPLIANCE_UPSERT_PROVIDER_SCORE_SQL,
COMPLIANCE_UPSERT_TENANT_SUMMARY_SQL,
)
from tasks.utils import CustomEncoder, batched
from tasks.utils import CustomEncoder
from api.compliance import PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE
from api.constants import SEVERITY_ORDER
@@ -2082,169 +2069,3 @@ def aggregate_finding_group_summaries(tenant_id: str, scan_id: str):
"created": created_count,
"updated": updated_count,
}
def reset_ephemeral_resource_findings_count(tenant_id: str, scan_id: str) -> dict:
"""Zero failed_findings_count for resources missing from a completed full-scope scan.
Resources that exist in the database for the scan's provider but were not
touched by this scan are treated as ephemeral. We keep their historical
findings, but reset the denormalized counter that drives the Resources page
sort so they stop ranking at the top.
Skipped (no-op) when:
- The scan is not in COMPLETED state.
- The scan ran with any scoping filter in scanner_args (partial scope).
Query design (must scale to 500k+ resources per provider):
Phase 1 — collect ephemeral IDs with one anti-join read.
Outer filter ``(tenant_id, provider_id, failed_findings_count > 0)``
uses ``resources_tenant_provider_idx``. The correlated
``NOT EXISTS`` subquery hits the implicit unique index
``(tenant_id, scan_id, resource_id)`` on ``ResourceScanSummary``.
``NOT EXISTS`` (vs ``NOT IN``) is null-safe and lets the planner
choose between hash anti-join and indexed nested-loop anti-join.
``.iterator(chunk_size=...)`` skips the queryset cache so memory
stays bounded while streaming UUIDs.
Phase 2 — UPDATE in fixed-size batches.
One large UPDATE would hold row-exclusive locks for seconds and
create a WAL spike. Batched UPDATEs by ``id__in`` (~1k rows each)
hit the primary key, keep each lock window ~50ms, bound WAL chunks,
and let other writers proceed between batches.
``failed_findings_count__gt=0`` in the UPDATE is idempotent under
concurrent scans and skips no-op rewrites.
Reads use the primary DB, not the replica: ``ResourceScanSummary`` rows
were written by the same scan task that triggered this one, so replica
lag could falsely classify scanned resources as ephemeral.
Scope detection (``Scan.is_full_scope()``) derives the set of scoping
scanner_args from ``prowler.lib.scan.scan.Scan.__init__`` via
introspection, so the API can never drift from the SDK's filter
contract. Imported scans are also rejected by trigger — they may only
cover a partial slice of resources.
"""
with rls_transaction(tenant_id):
scan = Scan.objects.filter(tenant_id=tenant_id, id=scan_id).first()
if scan is None:
logger.warning(f"Scan {scan_id} not found")
return {"status": "skipped", "reason": "scan not found"}
if scan.state != StateChoices.COMPLETED:
logger.info(f"Scan {scan_id} not completed; skipping ephemeral reset")
return {"status": "skipped", "reason": "scan not completed"}
if not scan.is_full_scope():
logger.info(
f"Scan {scan_id} ran with scoping filters; skipping ephemeral reset"
)
return {"status": "skipped", "reason": "partial scan scope"}
# Race protection: if a newer completed full-scope scan exists for this
# provider, our ResourceScanSummary set is stale relative to the resources'
# current failed_findings_count values (which the newer scan already
# refreshed). Wiping based on the older scan would zero counts the newer
# scan just set. Skip and let the newer scan's reset task do the work; if
# this task was delayed in the queue, that's the correct outcome.
# `completed_at__isnull=False` is required: Postgres orders NULL first in
# DESC, so a sibling COMPLETED scan with a missing completed_at would sort
# as "newest" and incorrectly cause us to skip.
with rls_transaction(tenant_id):
latest_full_scope_scan_id = (
Scan.objects.filter(
tenant_id=tenant_id,
provider_id=scan.provider_id,
state=StateChoices.COMPLETED,
completed_at__isnull=False,
)
.order_by("-completed_at", "-inserted_at")
.values_list("id", flat=True)
.first()
)
if latest_full_scope_scan_id != scan.id:
logger.info(
f"Scan {scan_id} is not the latest completed scan for provider "
f"{scan.provider_id}; skipping ephemeral reset"
)
return {"status": "skipped", "reason": "newer scan exists"}
# Defensive gate: ResourceScanSummary rows are written by perform_prowler_scan
# via best-effort bulk_create. If those writes failed silently (or the scan
# genuinely produced resources but no summaries were persisted), the
# ~Exists(in_scan) anti-join below would classify EVERY resource for this
# provider as ephemeral and zero their counts. Bail loudly instead.
with rls_transaction(tenant_id):
summaries_present = ResourceScanSummary.objects.filter(
tenant_id=tenant_id, scan_id=scan_id
).exists()
if scan.unique_resource_count > 0 and not summaries_present:
logger.error(
f"Scan {scan_id} reports {scan.unique_resource_count} unique "
f"resources but no ResourceScanSummary rows are persisted; "
f"skipping ephemeral reset to avoid wiping valid counts"
)
return {"status": "skipped", "reason": "summaries missing"}
# Stays on the primary DB intentionally. ResourceScanSummary rows are
# written by perform_prowler_scan in the same chain that triggered this
# task, so replica lag could return an empty/partial summary set; a stale
# read here would classify every Resource as ephemeral and wipe valid
# failed_findings_count values on the primary. Same rationale as
# update_provider_compliance_scores below in this module.
# Materializing the ID list (rather than streaming the iterator into
# batched UPDATEs) is intentional: it lets the UPDATEs run in their own
# short rls_transactions instead of one long transaction holding row locks
# on every batch. At 500k UUIDs the peak memory is ~40 MB — acceptable for
# a Celery worker — and is the better trade-off versus a multi-second
# write-lock window blocking concurrent scans.
with rls_transaction(tenant_id):
in_scan = ResourceScanSummary.objects.filter(
tenant_id=tenant_id,
scan_id=scan_id,
resource_id=OuterRef("pk"),
)
ephemeral_ids = list(
Resource.objects.filter(
tenant_id=tenant_id,
provider_id=scan.provider_id,
failed_findings_count__gt=0,
)
.filter(~Exists(in_scan))
.values_list("id", flat=True)
.iterator(chunk_size=DJANGO_FINDINGS_BATCH_SIZE)
)
if not ephemeral_ids:
logger.info(f"No ephemeral resources for scan {scan_id}")
return {
"status": "completed",
"scan_id": str(scan_id),
"provider_id": str(scan.provider_id),
"reset": 0,
}
total_updated = 0
for batch, _ in batched(ephemeral_ids, DJANGO_FINDINGS_BATCH_SIZE):
# batched() always yields a final tuple, which is empty when the input
# length is an exact multiple of the batch size. Skip it so we don't
# issue a no-op UPDATE ... WHERE id IN ().
if not batch:
continue
with rls_transaction(tenant_id):
total_updated += Resource.objects.filter(
tenant_id=tenant_id,
id__in=batch,
failed_findings_count__gt=0,
).update(failed_findings_count=0)
logger.info(
f"Ephemeral resource reset for scan {scan_id}: "
f"{total_updated} resources zeroed for provider {scan.provider_id}"
)
return {
"status": "completed",
"scan_id": str(scan_id),
"provider_id": str(scan.provider_id),
"reset": total_updated,
}
+1 -37
View File
@@ -58,7 +58,6 @@ from tasks.jobs.scan import (
aggregate_findings,
create_compliance_requirements,
perform_prowler_scan,
reset_ephemeral_resource_findings_count,
update_provider_compliance_scores,
)
from tasks.utils import (
@@ -78,7 +77,6 @@ from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.finding import Finding as FindingOutput
logger = get_task_logger(__name__)
@@ -160,13 +158,6 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
generate_outputs_task.si(
scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id
),
# post-scan task — runs in the parallel group so a
# failure cannot cascade into reports or integrations. Its only
# prerequisite is that perform_prowler_scan has committed
# ResourceScanSummary, which is true by the time this chain fires.
reset_ephemeral_resource_findings_count_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
),
group(
# Use optimized task that generates both reports with shared queries
@@ -402,8 +393,7 @@ class AttackPathsScanRLSTask(RLSTask):
SDK initialization, or Neo4j configuration errors during setup).
"""
def on_failure(self, exc, task_id, args, kwargs, _einfo): # noqa: ARG002
del args # Required by Celery's Task.on_failure signature; not used.
def on_failure(self, exc, task_id, args, kwargs, _einfo):
tenant_id = kwargs.get("tenant_id")
scan_id = kwargs.get("scan_id")
@@ -800,32 +790,6 @@ def aggregate_daily_severity_task(tenant_id: str, scan_id: str):
return aggregate_daily_severity(tenant_id=tenant_id, scan_id=scan_id)
@shared_task(name="scan-reset-ephemeral-resources", queue="overview")
@handle_provider_deletion
def reset_ephemeral_resource_findings_count_task(tenant_id: str, scan_id: str):
"""Reset failed_findings_count for resources missing from a completed full-scope scan.
Failures are swallowed and returned as a status: this task lives inside the
post-scan group, and Celery propagates group-member exceptions into the next
chain step meaning a crash here would block compliance reports and
integrations. The reset is purely cosmetic (UI sort optimization), so a
bad run is logged and absorbed rather than allowed to cascade.
"""
try:
return reset_ephemeral_resource_findings_count(
tenant_id=tenant_id, scan_id=scan_id
)
except Exception as exc: # noqa: BLE001 — intentionally broad
logger.exception(
f"reset_ephemeral_resource_findings_count failed for scan {scan_id}: {exc}"
)
return {
"status": "failed",
"scan_id": str(scan_id),
"reason": str(exc),
}
@shared_task(base=RLSTask, name="scan-finding-group-summaries", queue="overview")
@set_tenant(keep_tenant=True)
@handle_provider_deletion
-314
View File
@@ -24,7 +24,6 @@ from tasks.jobs.scan import (
aggregate_findings,
create_compliance_requirements,
perform_prowler_scan,
reset_ephemeral_resource_findings_count,
update_provider_compliance_scores,
)
from tasks.utils import CustomEncoder
@@ -36,7 +35,6 @@ from api.models import (
MuteRule,
Provider,
Resource,
ResourceScanSummary,
Scan,
ScanSummary,
StateChoices,
@@ -4337,315 +4335,3 @@ class TestUpdateProviderComplianceScores:
assert any("provider_compliance_scores" in c for c in calls)
assert any("tenant_compliance_summaries" in c for c in calls)
assert any("pg_advisory_xact_lock" in c for c in calls)
class TestScanIsFullScope:
def _live_trigger(self):
return Scan.TriggerChoices.MANUAL
@pytest.mark.parametrize(
"scanner_args",
[
{},
{"unrelated": "value"},
{"checks": None},
{"services": []},
{"severities": ""},
],
)
def test_full_scope_when_no_filters_present(self, scanner_args):
scan = Scan(scanner_args=scanner_args, trigger=self._live_trigger())
assert scan.is_full_scope() is True
def test_full_scope_covers_every_sdk_kwarg(self):
# Lock the predicate to whatever ProwlerScan's __init__ exposes today.
# If the SDK adds a new filter, this test still passes via the
# introspection-driven derivation; if it adds a non-filter kwarg
# (e.g. provider-like), keep the exclusion list in sync in models.py.
from prowler.lib.scan.scan import Scan as ProwlerScan
import inspect
expected = tuple(
name
for name in inspect.signature(ProwlerScan.__init__).parameters
if name not in ("self", "provider")
)
assert Scan.get_scoping_scanner_arg_keys() == expected
# Spot-check a few well-known filters survive the introspection.
assert "checks" in expected
assert "services" in expected
assert "severities" in expected
def test_partial_scope_for_each_sdk_filter(self):
for key in Scan.get_scoping_scanner_arg_keys():
scan = Scan(scanner_args={key: ["x"]}, trigger=self._live_trigger())
assert scan.is_full_scope() is False, f"{key} should mark scan as partial"
def test_imported_scan_is_never_full_scope(self):
# Forward-defensive: any trigger outside LIVE_SCAN_TRIGGERS (e.g. a
# future "imported" trigger) must never qualify, even with empty args.
scan = Scan(scanner_args={}, trigger="imported")
assert scan.is_full_scope() is False
def test_handles_none_scanner_args(self):
scan = Scan(scanner_args=None, trigger=self._live_trigger())
assert scan.is_full_scope() is True
@pytest.mark.django_db
class TestResetEphemeralResourceFindingsCount:
def _make_scan_summary(self, tenant_id, scan_id, resource):
return ResourceScanSummary.objects.create(
tenant_id=tenant_id,
scan_id=scan_id,
resource_id=resource.id,
service=resource.service,
region=resource.region,
resource_type=resource.type,
)
def test_resets_only_resources_missing_from_full_scope_scan(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, scan2, *_ = scans_fixture
resource1, resource2, resource3 = resources_fixture
Resource.objects.filter(id=resource1.id).update(failed_findings_count=3)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
Resource.objects.filter(id=resource3.id).update(failed_findings_count=7)
# Only resource1 was scanned in scan1; resource2 is ephemeral.
self._make_scan_summary(tenant.id, scan1.id, resource1)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 1
resource1.refresh_from_db()
resource2.refresh_from_db()
resource3.refresh_from_db()
assert resource1.failed_findings_count == 3
assert resource2.failed_findings_count == 0
# Other provider's resource is never touched.
assert resource3.failed_findings_count == 7
def test_skips_when_scan_not_completed(
self, tenants_fixture, scans_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
Scan.objects.filter(id=scan1.id).update(state=StateChoices.EXECUTING)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "scan not completed"
resource2.refresh_from_db()
assert resource2.failed_findings_count == 5
def test_skips_when_scan_has_scoping_filters(
self, tenants_fixture, scans_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
_, resource2, _ = resources_fixture
Scan.objects.filter(id=scan1.id).update(scanner_args={"checks": ["check1"]})
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "partial scan scope"
resource2.refresh_from_db()
assert resource2.failed_findings_count == 5
def test_skips_when_scan_not_found(self, tenants_fixture):
tenant, *_ = tenants_fixture
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(uuid.uuid4())
)
assert result["status"] == "skipped"
assert result["reason"] == "scan not found"
def test_skips_when_newer_scan_completed_for_same_provider(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
# If a newer completed scan exists for the same provider, our
# ResourceScanSummary set is stale relative to the resources' current
# counts, and applying the diff would corrupt them.
from datetime import timedelta
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
provider, *_ = providers_fixture
_, resource2, _ = resources_fixture
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
# Create a newer COMPLETED scan for the same provider, with an
# explicit completed_at strictly after scan1's so ordering is
# deterministic regardless of clock resolution.
newer_completed_at = scan1.completed_at + timedelta(minutes=5)
Scan.objects.create(
name="Newer Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
started_at=newer_completed_at,
completed_at=newer_completed_at,
)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "newer scan exists"
resource2.refresh_from_db()
assert resource2.failed_findings_count == 5
def test_does_not_touch_other_providers_resources(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
_, _, resource3 = resources_fixture
# resource3 belongs to provider2 with failed_findings_count > 0 and is
# not in scan1's summary. It MUST NOT be reset.
Resource.objects.filter(id=resource3.id).update(failed_findings_count=9)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 0
resource3.refresh_from_db()
assert resource3.failed_findings_count == 9
def test_resources_already_zero_are_not_rewritten(
self, tenants_fixture, scans_fixture, resources_fixture
):
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
# Both resources already at 0, neither in summary -> nothing to update.
Resource.objects.filter(id=resource1.id).update(failed_findings_count=0)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=0)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 0
def test_skips_when_summaries_missing_for_scan_with_resources(
self, tenants_fixture, scans_fixture, resources_fixture
):
# Catastrophic guard: if a scan reports unique_resource_count > 0 but
# no ResourceScanSummary rows are persisted (e.g. bulk_create silently
# failed), the anti-join would classify EVERY resource as ephemeral
# and zero their counts. The gate must skip and preserve the data.
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
Scan.objects.filter(id=scan1.id).update(unique_resource_count=10)
Resource.objects.filter(id=resource1.id).update(failed_findings_count=3)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "skipped"
assert result["reason"] == "summaries missing"
resource1.refresh_from_db()
resource2.refresh_from_db()
assert resource1.failed_findings_count == 3
assert resource2.failed_findings_count == 5
def test_ignores_sibling_scan_with_null_completed_at(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
# Postgres orders NULL first in DESC; a sibling COMPLETED scan with a
# missing completed_at must not be treated as the latest scan and
# cause us to incorrectly skip the reset.
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
provider, *_ = providers_fixture
resource1, resource2, _ = resources_fixture
Resource.objects.filter(id=resource2.id).update(failed_findings_count=5)
self._make_scan_summary(tenant.id, scan1.id, resource1)
Scan.objects.create(
name="Ghost Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant_id=tenant.id,
started_at=scan1.completed_at,
completed_at=None,
)
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 1
resource2.refresh_from_db()
assert resource2.failed_findings_count == 0
def test_batches_updates_when_many_ephemeral_resources(
self, tenants_fixture, scans_fixture, resources_fixture
):
# Forces multiple batches to confirm the chunked UPDATE path executes
# cleanly and the count is the sum across batches.
tenant, *_ = tenants_fixture
scan1, *_ = scans_fixture
resource1, resource2, _ = resources_fixture
Resource.objects.filter(id=resource1.id).update(failed_findings_count=2)
Resource.objects.filter(id=resource2.id).update(failed_findings_count=4)
# No ResourceScanSummary -> both resource1 and resource2 are ephemeral.
# Force a 1-row batch via the shared findings batch size knob.
with patch("tasks.jobs.scan.DJANGO_FINDINGS_BATCH_SIZE", 1):
result = reset_ephemeral_resource_findings_count(
tenant_id=str(tenant.id), scan_id=str(scan1.id)
)
assert result["status"] == "completed"
assert result["reset"] == 2
resource1.refresh_from_db()
resource2.refresh_from_db()
assert resource1.failed_findings_count == 0
assert resource2.failed_findings_count == 0
@@ -215,6 +215,3 @@ Also is important to keep all code examples as short as possible, including the
| e5 | M365 and Azure Entra checks enabled by or dependent on an E5 license (e.g., advanced threat protection, audit, DLP, and eDiscovery) |
| privilege-escalation | Detects IAM policies or permissions that allow identities to elevate their privileges beyond their intended scope, potentially gaining administrator or higher-level access through specific action combinations |
| ec2-imdsv1 | Identifies EC2 instances using Instance Metadata Service version 1 (IMDSv1), which is vulnerable to SSRF attacks and should be replaced with IMDSv2 for enhanced security |
| vercel-hobby-plan | Vercel checks whose audited feature is available on the Hobby plan (and therefore also on Pro and Enterprise plans) |
| vercel-pro-plan | Vercel checks whose audited feature requires a Pro plan or higher, including features also available on Enterprise or via supported paid add-ons for Pro plans |
| vercel-enterprise-plan | Vercel checks whose audited feature requires the Enterprise plan |
+6 -20
View File
@@ -27,28 +27,14 @@ The most common high level steps to create a new check are:
### Naming Format for Checks
If you already know the check name when creating a request or implementing a check, use a descriptive identifier with lowercase letters and underscores only.
Recommended patterns:
- `<service>_<resource>_<best_practice>`
Checks must be named following the format: `service_subservice_resource_action`.
The name components are:
- `service` The main service or product area being audited (e.g., ec2, entra, iam, bedrock).
- `resource` The resource, feature, or configuration being evaluated. It can be a single word or a compound phrase joined with underscores (e.g., instance, policy, guardrail, sensitive_information_filter).
- `best_practice` The expected secure state or best practice being checked (e.g., enabled, encrypted, restricted, configured, not_publicly_accessible).
Additional guidance:
- Use underscores only. Do not use hyphens.
- Keep the name specific enough to describe the behavior of the check.
- The first segment should match the service or product area whenever possible.
Examples:
- `s3_bucket_versioning_enabled`
- `bedrock_guardrail_sensitive_information_filter_enabled`
- `service` The main service being audited (e.g., ec2, entra, iam, etc.)
- `subservice` An individual component or subset of functionality within the service that is being audited. This may correspond to a shortened version of the class attribute accessed within the check. If there is no subservice, just omit.
- `resource` The specific resource type being evaluated (e.g., instance, policy, role, etc.)
- `action` The security aspect or configuration being checked (e.g., public, encrypted, enabled, etc.)
### File Creation
@@ -401,7 +387,7 @@ Provides both code examples and best practice recommendations for addressing the
#### Categories
One or more functional groupings used for execution filtering (e.g., `internet-exposed`). Categories must match the predefined values enforced by `CheckMetadata`; adding a new category requires updating the validator and the metadata documentation.
One or more functional groupings used for execution filtering (e.g., `internet-exposed`). You can define new categories just by adding to this field.
For the complete list of available categories, see [Categories Guidelines](/developer-guide/check-metadata-guidelines#categories-guidelines).
-131
View File
@@ -1,131 +0,0 @@
---
title: 'Prowler Studio'
---
**Prowler Studio is an AI workflow that ensures Claude Code follows Prowler's skills, guardrails, and best practices when creating new security checks.** What lands in the resulting pull request is consistent, tested, and ready for human review — not half-correct boilerplate that needs to be rewritten.
<Info>
**Contributor Tool**: Prowler Studio is a workflow for advanced contributors adding new Prowler security checks. It is not part of Prowler Cloud, Prowler App, or Prowler CLI.
</Info>
<Warning>
**Preview Feature**: Prowler Studio is under active development and breaking changes are expected. Please report issues or share feedback on [GitHub](https://github.com/prowler-cloud/prowler-studio/issues) or in the [Slack community](https://goto.prowler.com/slack).
</Warning>
<Card title="Prowler Studio Repository" icon="github" href="https://github.com/prowler-cloud/prowler-studio" horizontal>
Clone the source code, install Prowler Studio, and explore the agent workflow in detail.
</Card>
## The Problem
Adding a new check to [Prowler](https://github.com/prowler-cloud/prowler) is more than writing detection logic. A correct check has to:
- Match Prowler's exact service and check folder structure and naming conventions
- Wire up metadata, severity, remediation, tests, and compliance mappings
- Mirror the patterns used by the hundreds of existing checks in the same provider
- Actually load when Prowler scans for available checks — silent structural mistakes are easy to make
Asking a general-purpose AI assistant to do this usually means guessing. It misses conventions, skips tests, or invents structure that looks right but does not load. The result is a half-correct PR that needs to be reviewed line by line or rewritten.
## The Solution
Prowler Studio enforces the workflow end-to-end. Describe the check once — a markdown ticket, a Jira issue, or a GitHub issue — and the workflow:
1. **Loads Prowler-specific skills into every agent.** Every step starts with the same context an experienced Prowler engineer would have in mind. See [AI Skills System](/developer-guide/ai-skills) for how skills are structured.
2. **Runs specialized agents in sequence.** Implementation → testing → compliance mapping → review → PR creation. Each agent has one job and a tight scope.
3. **Verifies as it goes.** The check must load in Prowler. Tests must pass. If something fails, the agent fixes it and re-runs (up to a bounded number of attempts) before moving on.
4. **Produces a complete pull request.** Branch, passing check, tests, compliance mappings, and a pull request waiting for human review.
The result is a consistent starting point, every time, on every supported provider.
## Quick Start
### Install
Prowler Studio requires [`uv`](https://docs.astral.sh/uv/getting-started/installation/) — see the official [installation guide](https://docs.astral.sh/uv/getting-started/installation/).
```bash
git clone https://github.com/prowler-cloud/prowler-studio
cd prowler-studio
uv sync
source .venv/bin/activate
```
### Describe the Check
A ticket is a structured markdown description of the check to create. It is the only input the workflow needs; every agent (implementation, testing, compliance mapping, review, PR creation) uses it as the source of truth, so the more concrete it is, the closer the first PR will land to the desired outcome.
The ticket can be supplied in three ways:
- **Local markdown file** → `--ticket path/to/ticket.md`
- **Jira issue** → `--jira-url https://...` (uses the issue body)
- **GitHub issue** → `--github-url https://...` (uses the issue body)
The content should follow the **New Check Request** template:
- The local copy at [`check_ticket_template.md`](https://github.com/prowler-cloud/prowler-studio/blob/main/check_ticket_template.md) covers `--ticket` and Jira tickets.
- A prefilled GitHub form is also available: [Create a New Check Request issue](https://github.com/prowler-cloud/prowler/issues/new?template=new-check-request.yml).
Sections marked *Optional* can be skipped; everything else helps the agents make the right decisions.
### Run the Workflow
From a local markdown ticket:
```bash
prowler-studio --ticket check_ticket.md
```
From a Jira ticket:
```bash
prowler-studio --jira-url https://mycompany.atlassian.net/browse/PROJ-123
```
From a GitHub issue:
```bash
prowler-studio --github-url https://github.com/owner/repo/issues/123
```
<Note>
Provide exactly one of `--ticket`, `--jira-url`, or `--github-url`.
</Note>
Keep changes local (no push, no pull request):
```bash
prowler-studio -b feat/my-check --ticket check_ticket.md --local
```
### What You Get
After a successful run the working environment contains:
- A new branch on a clean Prowler worktree containing the check, metadata, tests, and compliance mappings
- A pull request opened against Prowler (skipped with `--local`)
- A timestamped log file under `logs/` capturing every step the agents took
## CLI Options
| Option | Short | Description |
|--------|-------|-------------|
| `--branch` | `-b` | Branch name (default: `feat/<ticket>-<check_name>` or `feat/<check_name>`) |
| `--ticket` | `-t` | Path to a markdown check ticket file |
| `--jira-url` | `-j` | Jira ticket URL (e.g., `https://mycompany.atlassian.net/browse/PROJ-123`) |
| `--github-url` | `-g` | GitHub issue URL (e.g., `https://github.com/owner/repo/issues/123`) |
| `--working-dir` | `-w` | Working directory for the Prowler clone (default: `./working`) |
| `--no-worktree` | | Legacy mode — work directly on the main clone instead of using worktrees |
| `--cleanup-worktree` | | Remove the worktree after a successful pull request is created |
| `--local` | | Keep changes local — skip push and pull request creation |
## Configuration
Set these environment variables depending on the input source:
| Variable | When Needed | Purpose |
|----------|-------------|---------|
| `GITHUB_TOKEN` | `--github-url` (recommended) | Higher GitHub API rate limits and access to private issues |
| `JIRA_SITE_URL` | `--jira-url` | Jira site, e.g. `https://mycompany.atlassian.net` |
| `JIRA_EMAIL` | `--jira-url` | Email of the Jira account used to fetch the ticket |
| `JIRA_API_TOKEN` | `--jira-url` | API token for the Jira account |
+1 -2
View File
@@ -365,8 +365,7 @@
"developer-guide/security-compliance-framework",
"developer-guide/lighthouse-architecture",
"developer-guide/mcp-server",
"developer-guide/ai-skills",
"developer-guide/prowler-studio"
"developer-guide/ai-skills"
]
},
{
@@ -121,8 +121,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.25.2"
PROWLER_API_VERSION="5.25.2"
PROWLER_UI_VERSION="5.24.0"
PROWLER_API_VERSION="5.24.0"
```
<Note>
-34
View File
@@ -159,40 +159,6 @@ When these environment variables are set, the API will use them directly instead
A fix addressing this permission issue is being evaluated in [PR #9953](https://github.com/prowler-cloud/prowler/pull/9953).
</Note>
### Scan Stuck in Executing State After Worker Crash
When running Prowler App via Docker Compose, a scan may remain indefinitely in the `executing` state if the worker process crashes (for example, due to an Out of Memory condition) before it can update the scan status. Since it is not currently possible to cancel a scan in `executing` state through the UI, the workaround is to manually update the scan record in the database.
**Root Cause:**
The Celery worker process terminates unexpectedly (OOM, node failure, etc.) before transitioning the scan state to `completed` or `failed`. The scan record remains in `executing` with no active process to advance it.
**Solution:**
Connect to the database using the `prowler_admin` user. Due to Row-Level Security (RLS), the default database user cannot see scan records — you must use `prowler_admin`:
```bash
psql -U prowler_admin -d prowler_db
```
Identify the stuck scan by filtering for scans in `executing` state:
```sql
SELECT id, name, state, started_at FROM scans WHERE state = 'executing';
```
Update the scan state to `failed` using the scan ID:
```sql
UPDATE scans SET state = 'failed' WHERE id = '<scan-id>';
```
After this change, the scan will appear as failed in the UI and you can launch a new scan.
<Note>
A feature to cancel executing scans directly from the UI is being tracked in [GitHub Issue #6893](https://github.com/prowler-cloud/prowler/issues/6893).
</Note>
### SAML/OAuth ACS URL Incorrect When Running Behind a Proxy or Load Balancer
See [GitHub Issue #9724](https://github.com/prowler-cloud/prowler/issues/9724) for more details.
@@ -0,0 +1,47 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
@@ -160,25 +160,3 @@ Prowler for Vercel includes security checks across the following services:
| **Project** | Deployment protection, environment variable security, fork protection, and skew protection |
| **Security** | Web Application Firewall (WAF), rate limiting, IP blocking, and managed rulesets |
| **Team** | SSO enforcement, directory sync, member access, and invitation hygiene |
## Checks With Explicit Plan-Based Behavior
Prowler currently includes 26 Vercel checks. The 11 checks below have explicit billing-plan handling in the provider metadata or check logic. When the scanned scope reports a billing plan, Prowler adds plan-aware context to findings for these checks. If the API does not expose the required configuration, Prowler may return `MANUAL` and require verification in the Vercel dashboard.
| Check ID | Hobby | Pro | Enterprise | Notes |
|----------|-------|-----|------------|-------|
| `project_password_protection_enabled` | Not available | Available as a paid add-on | Available | Checks password protection for deployments |
| `project_production_deployment_protection_enabled` | Not available | Available with supported paid deployment protection options | Available | Checks protection for production deployments |
| `project_skew_protection_enabled` | Not available | Available | Available | Checks skew protection during rollouts |
| `security_custom_rules_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_ip_blocking_rules_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `team_saml_sso_enabled` | Not available | Available | Available | Checks team SAML SSO configuration |
| `team_saml_sso_enforced` | Not available | Available | Available | Checks SAML SSO enforcement for all team members |
| `team_directory_sync_enabled` | Not available | Not available | Available | Checks SCIM directory sync |
| `security_managed_rulesets_enabled` | Bot Protection and AI Bots managed rulesets | Bot Protection and AI Bots managed rulesets | All managed rulesets, including OWASP Core Ruleset | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_rate_limiting_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_waf_enabled` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
<Note>
The five firewall-related checks (`security_waf_enabled`, `security_custom_rules_configured`, `security_ip_blocking_rules_configured`, `security_rate_limiting_configured`, and `security_managed_rulesets_enabled`) return `MANUAL` when the firewall configuration endpoint is not accessible from the API. The other 15 current Vercel checks do not currently include plan-specific handling in provider logic, but every Vercel check includes exactly one billing-plan metadata category (`vercel-hobby-plan`, `vercel-pro-plan`, or `vercel-enterprise-plan`) alongside its functional security category.
</Note>
@@ -227,7 +227,6 @@ Assign administrative permissions by selecting from the following options:
| Manage Integrations | All | Add or modify the Prowler Integrations. |
| Manage Ingestions | Prowler Cloud | Allow or deny the ability to submit findings ingestion batches via the API. |
| Manage Billing | Prowler Cloud | Access and manage billing settings and subscription information. |
| Manage Alerts | Prowler Cloud | Create, edit, and delete alert rules and recipients. |
<Note>
The **Scope** column indicates where each permission applies. **All** means the permission is available in both Prowler Cloud and Self-Managed deployments. **Prowler Cloud** indicates permissions that are specific to [Prowler Cloud](https://cloud.prowler.com/sign-in).
@@ -242,5 +241,3 @@ The following permissions are available exclusively in **Prowler Cloud**:
**Manage Ingestions:** Submit and manage findings ingestion jobs via the API. Required to upload OCSF scan results using the `--push-to-cloud` CLI flag or the ingestion endpoints. See [Import Findings](/user-guide/tutorials/prowler-app-import-findings) for details.
**Manage Billing:** Access and manage billing settings, subscription plans, and payment methods.
**Manage Alerts:** Create, edit, and delete alert rules and recipients used to deliver scan-result digests via email.
@@ -0,0 +1,51 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
+2 -23
View File
@@ -2,32 +2,11 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.26.0] (Prowler UNRELEASED)
### 🚀 Added
- `bedrock_guardrails_configured` check for AWS provider [(#10844)](https://github.com/prowler-cloud/prowler/pull/10844)
- Universal compliance pipeline integrated into the CLI: `--list-compliance` and `--list-compliance-requirements` show universal frameworks, and CSV plus OCSF outputs are generated for any framework declaring a `TableConfig` [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
- ASD Essential Eight Maturity Model compliance framework for AWS (Maturity Level One, Nov 2023) [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
- Update Vercel checks to return personalized finding status extended depending on billing plan and classify them with billing-plan categories [(#10663)](https://github.com/prowler-cloud/prowler/pull/10663)
- `bedrock_prompt_management_exists` check for AWS provider [(#10878)](https://github.com/prowler-cloud/prowler/pull/10878)
### 🔄 Changed
- Azure Network Watcher flow log checks now require workspace-backed Traffic Analytics for `network_flow_log_captured_sent` and align metadata with VNet-compatible flow log guidance [(#10645)](https://github.com/prowler-cloud/prowler/pull/10645)
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs [(#10937)](https://github.com/prowler-cloud/prowler/pull/10937)
- AWS CodeBuild service now batches `BatchGetProjects` and `BatchGetBuilds` calls per region (up to 100 items per call) to reduce API call volume and prevent throttling-induced false positives in `codebuild_project_not_publicly_accessible` [(#10639)](https://github.com/prowler-cloud/prowler/pull/10639)
- `display_compliance_table` dispatch switched from substring `in` checks to `startswith` to prevent false matches between similarly named frameworks (e.g. `cisa` vs `cis`) [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
## [5.25.3] (Prowler UNRELEASED)
### 🐞 Fixed
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
- AWS `boto` user agent extra is now applied to every client [(#10944)](https://github.com/prowler-cloud/prowler/pull/10944)
- Image provider connection check no longer fails with a misleading `host='https'` resolution error when the registry URL includes an `http://` or `https://` scheme prefix [(#10950)](https://github.com/prowler-cloud/prowler/pull/10950)
### 🔐 Security
- Parser-mismatch SSRF in image provider registry auth where crafted bearer-token realms and pagination links could force requests to internal addresses and leak credentials cross-origin [(#10945)](https://github.com/prowler-cloud/prowler/pull/10945)
- Oracle cloud identity scans now scan known or supplied regions to better support non ashburn tenancies [(#10529)](https://github.com/prowler-cloud/prowler/pull/10529)
---
+13 -56
View File
@@ -45,10 +45,7 @@ from prowler.lib.check.check import (
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
from prowler.lib.check.compliance_models import (
Compliance,
get_bulk_compliance_frameworks_universal,
)
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.custom_checks_metadata import (
parse_custom_checks_metadata_file,
update_checks_metadata,
@@ -78,10 +75,7 @@ from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
GoogleWorkspaceCISASCuBA,
)
from prowler.lib.outputs.compliance.compliance import (
display_compliance_table,
process_universal_compliance_frameworks,
)
from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
from prowler.lib.outputs.compliance.csa.csa_azure import AzureCSA
@@ -90,9 +84,6 @@ from prowler.lib.outputs.compliance.csa.csa_oraclecloud import OracleCloudCSA
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.essential_eight.essential_eight_aws import (
EssentialEightAWS,
)
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_azure import AzureISO27001
@@ -244,8 +235,6 @@ def prowler():
# Load compliance frameworks
logger.debug("Loading compliance frameworks from .json files")
universal_frameworks = {}
# Skip compliance frameworks for external-tool providers
if provider not in EXTERNAL_TOOL_PROVIDERS:
bulk_compliance_frameworks = Compliance.get_bulk(provider)
@@ -253,8 +242,6 @@ def prowler():
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
# Load universal compliance frameworks for new rendering pipeline
universal_frameworks = get_bulk_compliance_frameworks_universal(provider)
# Update checks metadata if the --custom-checks-metadata-file is present
custom_checks_metadata = None
@@ -267,12 +254,12 @@ def prowler():
)
if args.list_compliance:
all_frameworks = {**bulk_compliance_frameworks, **universal_frameworks}
print_compliance_frameworks(all_frameworks)
print_compliance_frameworks(bulk_compliance_frameworks)
sys.exit()
if args.list_compliance_requirements:
all_frameworks = {**bulk_compliance_frameworks, **universal_frameworks}
print_compliance_requirements(all_frameworks, args.list_compliance_requirements)
print_compliance_requirements(
bulk_compliance_frameworks, args.list_compliance_requirements
)
sys.exit()
# Load checks to execute
@@ -289,7 +276,6 @@ def prowler():
provider=provider,
list_checks=getattr(args, "list_checks", False)
or getattr(args, "list_checks_json", False),
universal_frameworks=universal_frameworks,
)
# if --list-checks-json, dump a json file and exit
@@ -638,29 +624,15 @@ def prowler():
)
# Compliance Frameworks
# Source the framework listing from the union of `bulk_compliance_frameworks`
# and `universal_frameworks` so universal-only frameworks (e.g.
# `prowler/compliance/csa_ccm_4.0.json`) — which `Compliance.get_bulk(provider)`
# does not load — still reach `process_universal_compliance_frameworks` below.
# The provider-specific block subtracts the names handled by the universal
# processor so the legacy per-provider handlers only see frameworks that the
# bulk loader actually resolved.
# Source the framework listing from `bulk_compliance_frameworks.keys()`
# so it is by construction a subset of what the bulk loader can resolve.
# `get_available_compliance_frameworks(provider)` also discovers top-level
# multi-provider universal JSONs (e.g. `prowler/compliance/csa_ccm_4.0.json`)
# which `Compliance.get_bulk(provider)` does not load, and which the legacy
# output handlers below cannot consume — using it as the source produced
input_compliance_frameworks = set(output_options.output_modes).intersection(
set(bulk_compliance_frameworks.keys()) | set(universal_frameworks.keys())
bulk_compliance_frameworks.keys()
)
# ── Universal compliance frameworks (provider-agnostic) ──
universal_processed = process_universal_compliance_frameworks(
input_compliance_frameworks=input_compliance_frameworks,
universal_frameworks=universal_frameworks,
finding_outputs=finding_outputs,
output_directory=output_options.output_directory,
output_filename=output_options.output_filename,
provider=provider,
generated_outputs=generated_outputs,
)
input_compliance_frameworks -= universal_processed
if provider == "aws":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("cis_"):
@@ -676,18 +648,6 @@ def prowler():
)
generated_outputs["compliance"].append(cis)
cis.batch_write_data_to_file()
elif compliance_name.startswith("essential_eight"):
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
essential_eight = EssentialEightAWS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(essential_eight)
essential_eight.batch_write_data_to_file()
elif compliance_name == "mitre_attack_aws":
# Generate MITRE ATT&CK Finding Object
filename = (
@@ -1442,9 +1402,6 @@ def prowler():
output_options.output_filename,
output_options.output_directory,
compliance_overview,
universal_frameworks=universal_frameworks,
provider=provider,
output_formats=args.output_formats,
)
if compliance_overview:
print(
+8 -8
View File
@@ -6426,9 +6426,9 @@
}
],
"Checks": [
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled"
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
]
},
{
@@ -6485,9 +6485,9 @@
}
],
"Checks": [
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled"
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
]
},
{
@@ -6546,8 +6546,8 @@
}
],
"Checks": [
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled"
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
]
},
{
@@ -6606,8 +6606,8 @@
}
],
"Checks": [
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled"
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
]
},
{
File diff suppressed because it is too large Load Diff
@@ -2894,10 +2894,8 @@
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"bedrock_prompt_management_exists",
"cloudformation_stack_outputs_find_secrets",
"cloudfront_distributions_custom_ssl_certificate",
"cloudfront_distributions_default_root_object",
@@ -2898,10 +2898,8 @@
"bedrock_agent_guardrail_enabled",
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"bedrock_prompt_management_exists",
"cloudformation_stack_outputs_find_secrets",
"cloudfront_distributions_custom_ssl_certificate",
"cloudfront_distributions_default_root_object",
+6 -6
View File
@@ -2276,9 +2276,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting thegeneration of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "From Azure Portal Existing NSG flow logs can still be reviewed under Network Watcher > Flow logs. If you already have NSG flow logs configured, ensure they remain enabled and that Traffic Analytics sends data to a Log Analytics Workspace until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create Virtual network flow logs instead: 1. Navigate to Network Watcher. 2. Select Flow logs. 3. Select + Create. 4. Select the desired Subscription. 5. For Flow log type, select Virtual network. 6. Select + Select target resource. 7. Select a virtual network. 8. Click Confirm selection. 9. Select or create a new Storage Account. 10. Input the retention in days to retain the log. 11. Click Next. 12. Under Analytics, select Version 2, enable Traffic Analytics, and select a Log Analytics Workspace. 13. Select Next. 14. Optionally add Tags. 15. Select Review + create. 16. Select Create.",
"AuditProcedure": "From Azure Portal 1. Navigate to Network Watcher. 2. Select Flow logs. 3. Review existing Network security group flow logs, if any remain, to ensure they are enabled and configured to send logs to a Log Analytics Workspace. 4. Review Virtual network flow logs for new or migrated coverage.",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"RemediationProcedure": "From Azure Portal 1. Navigate to Network Watcher. 2. Select NSG flow logs. 3. Select + Create. 4. Select the desired Subscription. 5. Select + Select NSG. 6. Select a network security group. 7. Click Confirm selection. 8. Select or create a new Storage Account. 9. Input the retention in days to retain the log. 10. Click Next. 11. Under Configuration, select Version 2. 12. If rich analytics are required, select Enable Traffic Analytics, a processing interval, and a Log Analytics Workspace. 13. Select Next. 14. Optionally add Tags. 15. Select Review + create. 16. Select Create. Warning The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "From Azure Portal 1. Navigate to Network Watcher. 2. Select NSG flow logs 3. For each log you wish to audit select it from this view.",
"AdditionalInformation": "",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation"
}
@@ -2702,9 +2702,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "From Azure Portal Existing NSG flow logs can still be reviewed under Network Watcher > Flow logs. If you already have NSG flow logs configured, ensure Status is set to On and Retention (days) is set to 0, 90, or a number greater than 90 until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure Virtual network flow logs instead and set Retention days to 0, 90, or a number greater than 90. From Azure CLI Update an existing flow log retention policy with az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days>.",
"AuditProcedure": "From Azure Portal 1. Go to Network Watcher. 2. Select Flow logs. 3. Review existing Network security group flow logs, if any remain, and ensure Status is set to On and Retention (days) is set to 0, 90, or a number greater than 90. 4. Review Virtual network flow logs for new or migrated coverage. From Azure CLI az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId] Ensure each relevant flow log has retention days set to 0, 90, or a number greater than 90.",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"RemediationProcedure": "From Azure Portal 1. Go to Network Watcher 2. Select NSG flow logs blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure Status is set to On 5. Ensure Retention (days) setting greater than 90 days 6. Select your storage account in the Storage account field 7. Select Save From Azure CLI Enable the NSG flow logs and set the Retention (days) to greater than or equal to 90 days. az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 -- storage-account <NameorID of the storage account to save flow logs>",
"AuditProcedure": "From Azure Portal 1. Go to Network Watcher 2. Select NSG flow logs blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure Status is set to On 5. Ensure Retention (days) setting greater than 90 days From Azure CLI az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy' Ensure that enabled is set to true and days is set to greater then or equal to 90.",
"AdditionalInformation": "",
"DefaultValue": "By default, Network Security Group Flow Logs are disabled.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-logging-threat-detection#lt-6-configure-log-storage-retention"
}
+6 -6
View File
@@ -2241,9 +2241,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**From Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`. 1. Select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Virtual network`. 1. Select `+ Select target resource`. 1. Select `Virtual network`. 1. Select a virtual network. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. Input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`.",
"AuditProcedure": "**From Azure Portal** 1. Navigate to `Network Watcher`. 1. Select `Flow logs`. 1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`. 1. Review `Virtual network` flow logs for new or migrated coverage. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"RemediationProcedure": "**From Azure Portal** 1. Navigate to `Network Watcher`. 1. Select `NSG flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. Select `+ Select NSG`. 1. Select a network security group. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. Input the retention in days to retain the log. 1. Click `Next`. 1. Under `Configuration`, select `Version 2`. 1. If rich analytics are required, select `Enable Traffic Analytics`, a processing interval, and a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`. ***Warning*** The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**From Azure Portal** 1. Navigate to `Network Watcher`. 1. Select `NSG flow logs` 1. For each log you wish to audit select it from this view. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation"
}
@@ -2627,9 +2627,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "**From Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure `Virtual network` flow logs instead and set `Retention days` to `0`, `90`, or a number greater than `90`. **From Azure CLI** Update an existing flow log retention policy with: ``` az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days> ```",
"AuditProcedure": "**From Azure Portal** 1. Go to `Network Watcher`. 1. Select `Flow logs`. 1. Review existing `Network security group` flow logs, if any remain, and ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90`. 1. Review `Virtual network` flow logs for new or migrated coverage. **From Azure CLI** ``` az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId] ``` Ensure each relevant flow log has retention days set to `0`, `90`, or a number greater than `90`. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"RemediationProcedure": "**From Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` 6. Select your storage account in the `Storage account` field 7. Select `Save` **From Azure CLI** Enable the `NSG flow logs` and set the Retention (days) to greater than or equal to 90 days. ``` az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 --storage-account <NameorID of the storage account to save flow logs> ```",
"AuditProcedure": "**From Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` **From Azure CLI** ``` az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy' ``` Ensure that `enabled` is set to `true` and `days` is set to `greater then or equal to 90`. **From Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "",
"DefaultValue": "By default, Network Security Group Flow Logs are `disabled`.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-6-configure-log-storage-retention"
}
+6 -6
View File
@@ -2548,9 +2548,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace.This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Select `+ Create`.1. Select the desired Subscription.1. For `Flow log type`, select `Virtual network`.1. Select `+ Select target resource`.1. Select `Virtual network`.1. Select a virtual network.1. Click `Confirm selection`.1. Select or create a new Storage Account.1. If using a v2 storage account, input the retention in days to retain the log.1. Click `Next`.1. Under `Analytics`, for `Flow log version`, select `Version 2`.1. Check the box next to `Enable traffic analytics`.1. Select a processing interval.1. Select a `Log Analytics Workspace`.1. Select `Next`.1. Optionally add Tags.1. Select `Review + create`.1. Select `Create`.",
"AuditProcedure": "**Audit from Azure Portal**1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Click `Add filter`.1. From the `Filter` drop-down, select `Flow log type`.1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`.1. Review `Virtual network` flow logs for new or migrated coverage.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state'- **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group'- **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"RemediationProcedure": "**Remediate from Azure Portal**1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Select `+ Create`.1. Select the desired Subscription.1. For `Flow log type`, select `Network security group`.1. Select `+ Select target resource`.1. Select `Network security group`.1. Select a network security group.1. Click `Confirm selection`.1. Select or create a new Storage Account.1. If using a v2 storage account, input the retention in days to retain the log.1. Click `Next`.1. Under `Analytics`, for `Flow log version`, select `Version 2`.1. Check the box next to `Enable traffic analytics`.1. Select a processing interval.1. Select a `Log Analytics Workspace`.1. Select `Next`.1. Optionally add Tags.1. Select `Review + create`.1. Select `Create`.***Warning***The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**Audit from Azure Portal**1. Navigate to `Network Watcher`.1. Under `Logs`, select `Flow logs`.1. Click `Add filter`.1. From the `Filter` drop-down, select `Flow log type`.1. From the `Value` drop-down, check `Network security group` only.1. Click `Apply`.1. Ensure that at least one network security group flow log is listed and is configured to send logs to a `Log Analytics Workspace`.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state'- **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group'- **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation"
}
@@ -2934,9 +2934,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure `Virtual network` flow logs instead and set `Retention days` to `0`, `90`, or a number greater than `90`.**Remediate from Azure CLI**Update an existing flow log retention policy with:```az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days>```",
"AuditProcedure": "**Audit from Azure Portal**1. Go to `Network Watcher`.1. Select `Flow logs`.1. Review existing `Network security group` flow logs, if any remain, and ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90`.1. Review `Virtual network` flow logs for new or migrated coverage.**Audit from Azure CLI**```az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId]```Ensure each relevant flow log has retention days set to `0`, `90`, or a number greater than `90`.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies.",
"RemediationProcedure": "**Remediate from Azure Portal**1. Go to `Network Watcher`2. Select `NSG flow logs` blade in the Logs section3. Select each Network Security Group from the list4. Ensure `Status` is set to `On`5. Ensure `Retention (days)` setting `greater than 90 days`6. Select your storage account in the `Storage account` field7. Select `Save`**Remediate from Azure CLI**Enable the `NSG flow logs` and set the Retention (days) to greater than or equal to 90 days.```az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 --storage-account <NameorID of the storage account to save flow logs>```",
"AuditProcedure": "**Audit from Azure Portal**1. Go to `Network Watcher`2. Select `NSG flow logs` blade in the Logs section3. Select each Network Security Group from the list4. Ensure `Status` is set to `On`5. Ensure `Retention (days)` setting `greater than 90 days`**Audit from Azure CLI**```az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy'```Ensure that `enabled` is set to `true` and `days` is set to `greater then or equal to 90`.**Audit from Azure Policy**If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure.If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions- **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "",
"DefaultValue": "By default, Network Security Group Flow Logs are `disabled`.",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-6-configure-log-storage-retention"
}
+6 -6
View File
@@ -1302,9 +1302,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace. **Retirement Notice** On September 30, 2027, network security group (NSG) flow logs will be retired. Starting June 30, 2025, it will no longer be possible to create new NSG flow logs. Azure recommends migrating to virtual network flow logs. Review https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement for more information. For virtual network flow logs, consider applying the recommendation `Ensure that virtual network flow logs are captured and sent to Log Analytics` in this section.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Virtual network`. 1. Select `+ Select target resource`. 1. Select `Virtual network`. 1. Select a virtual network. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`. 1. Review `Virtual network` flow logs for new or migrated coverage. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies. For details, see the official announcement: https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement.",
"RemediationProcedure": "**Remediate from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Network security group`. 1. Select `+ Select target resource`. 1. Select `Network security group`. 1. Select a network security group. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`. ***Warning*** The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. From the `Value` drop-down, check `Network security group` only. 1. Click `Apply`. 1. Ensure that at least one network security group flow log is listed and is configured to send logs to a `Log Analytics Workspace`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics."
}
@@ -1789,9 +1789,9 @@
"Description": "Network Security Group Flow Logs should be enabled and the retention period set to greater than or equal to 90 days. **Retirement Notice** On September 30, 2027, network security group (NSG) flow logs will be retired. Starting June 30, 2025, it will no longer be possible to create new NSG flow logs. Azure recommends migrating to virtual network flow logs. Review https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement for more information. For virtual network flow logs, consider applying the recommendation `Ensure that virtual network flow log retention days is set to greater than or equal to 90` in this section.",
"RationaleStatement": "Flow logs enable capturing information about IP traffic flowing in and out of network security groups. Logs can be used to check for anomalies and give insight into suspected breaches.",
"ImpactStatement": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use.",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, configure `Virtual network` flow logs instead and set `Retention days` to `0`, `90`, or a number greater than `90`. **Remediate from Azure CLI** Update an existing flow log retention policy with: ``` az network watcher flow-log update --location <location> --name <flow-log> --retention <number-of-days> ```",
"AuditProcedure": "**Audit from Azure Portal** 1. Go to `Network Watcher`. 1. Select `Flow logs`. 1. Review existing `Network security group` flow logs, if any remain, and ensure `Status` is set to `On` and `Retention (days)` is set to `0`, `90`, or a number greater than `90`. 1. Review `Virtual network` flow logs for new or migrated coverage. **Audit from Azure CLI** ``` az network watcher flow-log list --location <location> --query [*].[name,retentionPolicy,targetResourceId] ``` Ensure each relevant flow log has retention days set to `0`, `90`, or a number greater than `90`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies. For details, see the official announcement: https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement.",
"RemediationProcedure": "**Remediate from Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` 6. Select your storage account in the `Storage account` field 7. Select `Save` **Remediate from Azure CLI** Enable the `NSG flow logs` and set the Retention (days) to greater than or equal to 90 days. ``` az network watcher flow-log configure --nsg <NameorID of the Network Security Group> --enabled true --resource-group <resourceGroupName> --retention 91 --storage-account <NameorID of the storage account to save flow logs> ```",
"AuditProcedure": "**Audit from Azure Portal** 1. Go to `Network Watcher` 2. Select `NSG flow logs` blade in the Logs section 3. Select each Network Security Group from the list 4. Ensure `Status` is set to `On` 5. Ensure `Retention (days)` setting `greater than 90 days` **Audit from Azure CLI** ``` az network watcher flow-log show --resource-group <resourceGroup> --nsg <NameorID of the NetworkSecurityGroup> --query 'retentionPolicy' ``` Ensure that `enabled` is set to `true` and `days` is set to `greater then or equal to 90`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [5e1cd26a-5090-4fdb-9d6a-84a90335e22d](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F5e1cd26a-5090-4fdb-9d6a-84a90335e22d) **- Name:** 'Configure network security groups to use specific workspace, storage account and flowlog retention policy for traffic analytics'",
"AdditionalInformation": "",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview:https://docs.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-6-configure-log-storage-retention",
"DefaultValue": "By default, Network Security Group Flow Logs are `disabled`."
}
+3 -3
View File
@@ -1292,9 +1292,9 @@
"Description": "Ensure that network flow logs are captured and fed into a central log analytics workspace. **Retirement Notice** On September 30, 2027, network security group (NSG) flow logs will be retired. Starting June 30, 2025, it will no longer be possible to create new NSG flow logs. Azure recommends migrating to virtual network flow logs. Review https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement for more information. For virtual network flow logs, consider applying the recommendation `Ensure that virtual network flow logs are captured and sent to Log Analytics` in this section.",
"RationaleStatement": "Network Flow Logs provide valuable insight into the flow of traffic around your network and feed into both Azure Monitor and Azure Sentinel (if in use), permitting the generation of visual flow diagrams to aid with analyzing for lateral movement, etc.",
"ImpactStatement": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor.",
"RemediationProcedure": "**Remediate from Azure Portal** Existing NSG flow logs can still be reviewed under `Network Watcher` > `Flow logs`. If you already have NSG flow logs configured, ensure they remain enabled and that `Traffic Analytics` sends data to a `Log Analytics Workspace` until migration is complete. Azure no longer allows creation of new NSG flow logs after June 30, 2025. For new or migrated deployments, create `Virtual network` flow logs instead: 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Virtual network`. 1. Select `+ Select target resource`. 1. Select `Virtual network`. 1. Select a virtual network. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. Review existing `Network security group` flow logs, if any remain, to ensure they are enabled and configured to send logs to a `Log Analytics Workspace`. 1. Review `Virtual network` flow logs for new or migrated coverage. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "On September 30, 2027, NSG flow logs will be retired, and creating new NSG flow logs has not been possible since June 30, 2025. Azure recommends migrating to virtual network flow logs, which address NSG flow log limitations. After retirement, traffic analytics using NSG flow logs will no longer be supported, and existing NSG flow log resources will be deleted. Previously collected NSG flow log records will remain available per their retention policies. For details, see the official announcement: https://azure.microsoft.com/en-gb/updates?id=Azure-NSG-flow-logs-Retirement.",
"RemediationProcedure": "**Remediate from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Select `+ Create`. 1. Select the desired Subscription. 1. For `Flow log type`, select `Network security group`. 1. Select `+ Select target resource`. 1. Select `Network security group`. 1. Select a network security group. 1. Click `Confirm selection`. 1. Select or create a new Storage Account. 1. If using a v2 storage account, input the retention in days to retain the log. 1. Click `Next`. 1. Under `Analytics`, for `Flow log version`, select `Version 2`. 1. Check the box next to `Enable traffic analytics`. 1. Select a processing interval. 1. Select a `Log Analytics Workspace`. 1. Select `Next`. 1. Optionally add Tags. 1. Select `Review + create`. 1. Select `Create`. ***Warning*** The remediation policy creates remediation deployment and names them by concatenating the subscription name and the resource group name. The MAXIMUM permitted length of a deployment name is 64 characters. Exceeding this will cause the remediation task to fail.",
"AuditProcedure": "**Audit from Azure Portal** 1. Navigate to `Network Watcher`. 1. Under `Logs`, select `Flow logs`. 1. Click `Add filter`. 1. From the `Filter` drop-down, select `Flow log type`. 1. From the `Value` drop-down, check `Network security group` only. 1. Click `Apply`. 1. Ensure that at least one network security group flow log is listed and is configured to send logs to a `Log Analytics Workspace`. **Audit from Azure Policy** If referencing a digital copy of this Benchmark, clicking a Policy ID will open a link to the associated Policy definition in Azure. If referencing a printed copy, you can search Policy IDs from this URL: https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyMenuBlade/~/Definitions - **Policy ID:** [27960feb-a23c-4577-8d36-ef8b5f35e0be](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F27960feb-a23c-4577-8d36-ef8b5f35e0be) **- Name:** 'All flow log resources should be in enabled state' - **Policy ID:** [c251913d-7d24-4958-af87-478ed3b9ba41](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2Fc251913d-7d24-4958-af87-478ed3b9ba41) **- Name:** 'Flow logs should be configured for every network security group' - **Policy ID:** [4c3c6c5f-0d47-4402-99b8-aa543dd8bcee](https://portal.azure.com/#view/Microsoft_Azure_Policy/PolicyDetailBlade/definitionId/%2Fproviders%2FMicrosoft.Authorization%2FpolicyDefinitions%2F4c3c6c5f-0d47-4402-99b8-aa543dd8bcee) **- Name:** 'Flow logs should be configured for every virtual network'",
"AdditionalInformation": "",
"References": "https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-portal:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-logging-threat-detection#lt-4-enable-network-logging-for-security-investigation",
"DefaultValue": "By default Network Security Group logs are not sent to Log Analytics."
}
@@ -709,17 +709,17 @@
},
{
"Id": "3.1.8",
"Description": "Ensure that Network Watcher flow logs are captured and sent to Log Analytics",
"Description": "Ensure that Network Security Group Flow logs are captured and sent to Log Analytics",
"Checks": [
"network_flow_log_captured_sent"
],
"Attributes": [
{
"Title": "Network Watcher flow logs are captured and sent to Log Analytics",
"Title": "Network Security Group Flow logs are captured and sent to Log Analytics",
"Section": "3. Logging and Monitoring",
"SubSection": "3.1 Logging",
"AttributeDescription": "Ensure that Network Watcher flow logs for supported targets, such as virtual networks and network security groups, are collected and sent to a central Log Analytics workspace for monitoring and analysis.",
"AdditionalInformation": "Capturing Network Watcher flow logs provides visibility into traffic patterns across your network, helping detect anomalies, potential lateral movement, and security threats. These logs integrate with Azure Monitor and Azure Sentinel, enabling advanced analytics and visualization for improved network security and incident response. For new deployments, prefer virtual network flow logs because NSG flow logs are on the retirement path.",
"AttributeDescription": "Ensure that network flow logs are collected and sent to a central Log Analytics workspace for monitoring and analysis.",
"AdditionalInformation": "Capturing network flow logs provides visibility into traffic patterns across your network, helping detect anomalies, potential lateral movement, and security threats. These logs integrate with Azure Monitor and Azure Sentinel, enabling advanced analytics and visualization for improved network security and incident response.",
"LevelOfRisk": 4,
"Weight": 100
}
@@ -763,17 +763,17 @@
},
{
"Id": "3.2.1",
"Description": "Ensure that Network Watcher flow log retention period is '0 or at least 90 days'",
"Description": "Ensure that Network Security Group Flow Log retention period is 'greater than 90 days'",
"Checks": [
"network_flow_log_more_than_90_days"
],
"Attributes": [
{
"Title": "Network Watcher flow log retention period is '0 or at least 90 days'",
"Title": "Network Security Group Flow Log retention period is 'greater than 90 days'",
"Section": "3. Logging and Monitoring",
"SubSection": "3.2 Retention",
"AttributeDescription": "Enable Network Watcher flow logs for supported targets, such as virtual networks and network security groups, and configure the retention period to 0 for unlimited retention or at least 90 days to capture and store IP traffic data for security monitoring and analysis.",
"AdditionalInformation": "Network Watcher flow logs provide visibility into network traffic, helping detect anomalies, unauthorized access, and potential security breaches. Retaining logs for 0 days (unlimited) or at least 90 days ensures that historical data is available for incident investigation, compliance, and forensic analysis, strengthening overall network security monitoring. For new deployments, prefer virtual network flow logs because NSG flow logs are on the retirement path.",
"AttributeDescription": "Enable Network Security Group (NSG) Flow Logs and configure the retention period to at least 90 days to capture and store IP traffic data for security monitoring and analysis.",
"AdditionalInformation": "NSG Flow Logs provide visibility into network traffic, helping detect anomalies, unauthorized access, and potential security breaches. Retaining logs for at least 90 days ensures that historical data is available for incident investigation, compliance, and forensic analysis, strengthening overall network security monitoring.",
"LevelOfRisk": 3,
"Weight": 10
}
+4 -6
View File
@@ -48,7 +48,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.26.0"
prowler_version = "5.25.3"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
@@ -87,8 +87,8 @@ def get_available_compliance_frameworks(provider=None):
providers = [p.value for p in Provider]
if provider:
providers = [provider]
for current_provider in providers:
compliance_dir = f"{actual_directory}/../compliance/{current_provider}"
for provider in providers:
compliance_dir = f"{actual_directory}/../compliance/{provider}"
if not os.path.isdir(compliance_dir):
continue
with os.scandir(compliance_dir) as files:
@@ -97,9 +97,7 @@ def get_available_compliance_frameworks(provider=None):
available_compliance_frameworks.append(
file.name.removesuffix(".json")
)
# Also scan top-level compliance/ for multi-provider (universal) JSONs.
# When a specific provider was requested, only include the framework if it
# declares support for that provider; otherwise include all universal frameworks.
# Also scan top-level compliance/ for multi-provider JSONs
compliance_root = f"{actual_directory}/../compliance"
if os.path.isdir(compliance_root):
with os.scandir(compliance_root) as files:
+7 -30
View File
@@ -299,22 +299,12 @@ def print_compliance_frameworks(
def print_compliance_requirements(
bulk_compliance_frameworks: dict, compliance_frameworks: list
):
from prowler.lib.check.compliance_models import ComplianceFramework
for compliance_framework in compliance_frameworks:
for key in bulk_compliance_frameworks.keys():
entry = bulk_compliance_frameworks[key]
is_universal = isinstance(entry, ComplianceFramework)
if is_universal:
framework = entry.framework
provider = entry.provider or "Multi-provider"
version = entry.version
requirements = entry.requirements
else:
framework = entry.Framework
provider = entry.Provider or "Multi-provider"
version = entry.Version
requirements = entry.Requirements
framework = bulk_compliance_frameworks[key].Framework
provider = bulk_compliance_frameworks[key].Provider
version = bulk_compliance_frameworks[key].Version
requirements = bulk_compliance_frameworks[key].Requirements
# We can list the compliance requirements for a given framework using the
# bulk_compliance_frameworks keys since they are the compliance specification file name
if compliance_framework == key:
@@ -323,23 +313,10 @@ def print_compliance_requirements(
)
for requirement in requirements:
checks = ""
if is_universal:
req_checks = requirement.checks
req_id = requirement.id
req_description = requirement.description
else:
req_checks = requirement.Checks
req_id = requirement.Id
req_description = requirement.Description
if isinstance(req_checks, dict):
for prov, check_list in req_checks.items():
for check in check_list:
checks += f" {Fore.YELLOW}\t\t[{prov}] {check}\n{Style.RESET_ALL}"
else:
for check in req_checks:
checks += f" {Fore.YELLOW}\t\t{check}\n{Style.RESET_ALL}"
for check in requirement.Checks:
checks += f" {Fore.YELLOW}\t\t{check}\n{Style.RESET_ALL}"
print(
f"Requirement Id: {Fore.MAGENTA}{req_id}{Style.RESET_ALL}\n\t- Description: {req_description}\n\t- Checks:\n{checks}"
f"Requirement Id: {Fore.MAGENTA}{requirement.Id}{Style.RESET_ALL}\n\t- Description: {requirement.Description}\n\t- Checks:\n{checks}"
)
+5 -15
View File
@@ -22,7 +22,6 @@ def load_checks_to_execute(
categories: set = None,
resource_groups: set = None,
list_checks: bool = False,
universal_frameworks: dict = None,
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
@@ -156,21 +155,12 @@ def load_checks_to_execute(
if not bulk_compliance_frameworks:
bulk_compliance_frameworks = Compliance.get_bulk(provider=provider)
for compliance_framework in compliance_frameworks:
# Try universal frameworks first (snake_case dict-keyed checks)
if (
universal_frameworks
and compliance_framework in universal_frameworks
):
fw = universal_frameworks[compliance_framework]
for req in fw.requirements:
checks_to_execute.update(req.checks.get(provider.lower(), []))
elif compliance_framework in bulk_compliance_frameworks:
checks_to_execute.update(
CheckMetadata.list(
bulk_compliance_frameworks=bulk_compliance_frameworks,
compliance_framework=compliance_framework,
)
checks_to_execute.update(
CheckMetadata.list(
bulk_compliance_frameworks=bulk_compliance_frameworks,
compliance_framework=compliance_framework,
)
)
# Handle if there are categories passed using --categories
elif categories:
-43
View File
@@ -102,48 +102,6 @@ class CIS_Requirement_Attribute(BaseModel):
References: str
class EssentialEight_Requirement_Attribute_MaturityLevel(str, Enum):
"""ASD Essential Eight Maturity Level"""
ML1 = "ML1"
ML2 = "ML2"
ML3 = "ML3"
class EssentialEight_Requirement_Attribute_AssessmentStatus(str, Enum):
"""Essential Eight Requirement Attribute Assessment Status"""
Manual = "Manual"
Automated = "Automated"
class EssentialEight_Requirement_Attribute_CloudApplicability(str, Enum):
"""How well the ASD control maps to AWS cloud infrastructure."""
Full = "full"
Partial = "partial"
Limited = "limited"
NonApplicable = "non-applicable"
# Essential Eight Requirement Attribute
class EssentialEight_Requirement_Attribute(BaseModel):
"""ASD Essential Eight Requirement Attribute"""
Section: str
MaturityLevel: EssentialEight_Requirement_Attribute_MaturityLevel
AssessmentStatus: EssentialEight_Requirement_Attribute_AssessmentStatus
CloudApplicability: EssentialEight_Requirement_Attribute_CloudApplicability
MitigatedThreats: list[str]
Description: str
RationaleStatement: str
ImpactStatement: str
RemediationProcedure: str
AuditProcedure: str
AdditionalInformation: str
References: str
# Well Architected Requirement Attribute
class AWS_Well_Architected_Requirement_Attribute(BaseModel):
"""AWS Well Architected Requirement Attribute"""
@@ -292,7 +250,6 @@ class Compliance_Requirement(BaseModel):
Name: Optional[str] = None
Attributes: list[
Union[
EssentialEight_Requirement_Attribute,
CIS_Requirement_Attribute,
ENS_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
+12 -16
View File
@@ -62,9 +62,6 @@ VALID_CATEGORIES = frozenset(
"e5",
"privilege-escalation",
"ec2-imdsv1",
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
}
)
@@ -247,15 +244,14 @@ class CheckMetadata(BaseModel):
# store the compliance later if supplied
Compliance: Optional[list[Any]] = Field(default_factory=list)
# TODO: Remove noqa and fix cls vulture errors
@validator("Categories", each_item=True, pre=True, always=True)
def valid_category(cls, value, values): # noqa: F841
def valid_category(cls, value, values):
if not isinstance(value, str):
raise ValueError("Categories must be a list of strings")
value_lower = value.lower()
if not re.match("^[a-z0-9-]+$", value_lower):
raise ValueError(
f"Invalid category: {value}. Categories can only contain lowercase letters, numbers, and hyphen '-'"
f"Invalid category: {value}. Categories can only contain lowercase letters, numbers and hyphen '-'"
)
if (
value_lower not in VALID_CATEGORIES
@@ -283,7 +279,7 @@ class CheckMetadata(BaseModel):
return resource_type
@validator("ServiceName", pre=True, always=True)
def validate_service_name(cls, service_name, values): # noqa: F841
def validate_service_name(cls, service_name, values):
if not service_name:
raise ValueError("ServiceName must be a non-empty string")
@@ -300,7 +296,7 @@ class CheckMetadata(BaseModel):
return service_name
@validator("CheckID", pre=True, always=True)
def valid_check_id(cls, check_id, values): # noqa: F841
def valid_check_id(cls, check_id, values):
if not check_id:
raise ValueError("CheckID must be a non-empty string")
@@ -313,7 +309,7 @@ class CheckMetadata(BaseModel):
return check_id
@validator("CheckTitle", pre=True, always=True)
def validate_check_title(cls, check_title, values): # noqa: F841
def validate_check_title(cls, check_title, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if len(check_title) > 150:
raise ValueError(
@@ -326,13 +322,13 @@ class CheckMetadata(BaseModel):
return check_title
@validator("RelatedUrl", pre=True, always=True)
def validate_related_url(cls, related_url, values): # noqa: F841
def validate_related_url(cls, related_url, values):
if related_url and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
raise ValueError("RelatedUrl must be empty. This field is deprecated.")
return related_url
@validator("Remediation")
def validate_recommendation_url(cls, remediation, values): # noqa: F841
def validate_recommendation_url(cls, remediation, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
url = remediation.Recommendation.Url
if url and not url.startswith("https://hub.prowler.com/"):
@@ -342,7 +338,7 @@ class CheckMetadata(BaseModel):
return remediation
@validator("CheckType", pre=True, always=True)
def validate_check_type(cls, check_type, values): # noqa: F841
def validate_check_type(cls, check_type, values):
provider = values.get("Provider", "").lower()
# Non-AWS providers must have an empty CheckType list
@@ -371,7 +367,7 @@ class CheckMetadata(BaseModel):
return check_type
@validator("Description", pre=True, always=True)
def validate_description(cls, description, values): # noqa: F841
def validate_description(cls, description, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if len(description) > 400:
raise ValueError(
@@ -380,7 +376,7 @@ class CheckMetadata(BaseModel):
return description
@validator("Risk", pre=True, always=True)
def validate_risk(cls, risk, values): # noqa: F841
def validate_risk(cls, risk, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if len(risk) > 400:
raise ValueError(
@@ -389,7 +385,7 @@ class CheckMetadata(BaseModel):
return risk
@validator("ResourceGroup", pre=True, always=True)
def validate_resource_group(cls, resource_group): # noqa: F841
def validate_resource_group(cls, resource_group):
if resource_group and resource_group not in VALID_RESOURCE_GROUPS:
raise ValueError(
f"Invalid ResourceGroup: '{resource_group}'. Must be one of: {', '.join(sorted(VALID_RESOURCE_GROUPS))} or empty string."
@@ -397,7 +393,7 @@ class CheckMetadata(BaseModel):
return resource_group
@validator("AdditionalURLs", pre=True, always=True)
def validate_additional_urls(cls, additional_urls): # noqa: F841
def validate_additional_urls(cls, additional_urls):
if not isinstance(additional_urls, list):
raise ValueError("AdditionalURLs must be a list")
+62 -141
View File
@@ -1,17 +1,12 @@
import sys
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.c5.c5 import get_c5_table
from prowler.lib.outputs.compliance.ccc.ccc import get_ccc_table
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
from prowler.lib.outputs.compliance.compliance_check import ( # noqa: F401 - re-export for backward compatibility
get_check_compliance,
)
from prowler.lib.outputs.compliance.csa.csa import get_csa_table
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
from prowler.lib.outputs.compliance.essential_eight.essential_eight import (
get_essential_eight_table,
)
from prowler.lib.outputs.compliance.generic.generic_table import (
get_generic_compliance_table,
)
@@ -22,94 +17,6 @@ from prowler.lib.outputs.compliance.mitre_attack.mitre_attack import (
from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore import (
get_prowler_threatscore_table,
)
from prowler.lib.outputs.compliance.universal.universal_table import get_universal_table
def process_universal_compliance_frameworks(
input_compliance_frameworks: set,
universal_frameworks: dict,
finding_outputs: list,
output_directory: str,
output_filename: str,
provider: str,
generated_outputs: dict,
) -> set:
"""Process universal compliance frameworks, generating CSV and OCSF outputs.
For each framework in *input_compliance_frameworks* that exists in
*universal_frameworks* and has an outputs.table_config, this function
creates both a CSV (UniversalComplianceOutput) and an OCSF JSON
(OCSFComplianceOutput) file. OCSF is always generated regardless of
the user's ``--output-formats`` flag.
The function is idempotent: it tracks already-created writers via
``generated_outputs["compliance"]`` keyed by ``file_path``. If invoked
again for the same framework (e.g. once per streaming batch), it
reuses the existing writer instead of recreating it. This guarantees
one output writer per framework for the whole execution and keeps
the OCSF JSON array valid across multiple calls.
Returns the set of framework names that were processed so the caller
can remove them before entering the legacy per-provider output loop.
"""
from prowler.lib.outputs.compliance.universal.ocsf_compliance import (
OCSFComplianceOutput,
)
from prowler.lib.outputs.compliance.universal.universal_output import (
UniversalComplianceOutput,
)
existing_writers = {
getattr(out, "file_path", None): out
for out in generated_outputs.get("compliance", [])
if isinstance(out, (UniversalComplianceOutput, OCSFComplianceOutput))
}
processed = set()
for compliance_name in input_compliance_frameworks:
if not (
compliance_name in universal_frameworks
and universal_frameworks[compliance_name].outputs
and universal_frameworks[compliance_name].outputs.table_config
):
continue
fw = universal_frameworks[compliance_name]
# CSV output
csv_path = (
f"{output_directory}/compliance/" f"{output_filename}_{compliance_name}.csv"
)
if csv_path not in existing_writers:
output = UniversalComplianceOutput(
findings=finding_outputs,
framework=fw,
file_path=csv_path,
provider=provider,
)
generated_outputs["compliance"].append(output)
existing_writers[csv_path] = output
output.batch_write_data_to_file()
# OCSF output (always generated for universal frameworks)
ocsf_path = (
f"{output_directory}/compliance/"
f"{output_filename}_{compliance_name}.ocsf.json"
)
if ocsf_path not in existing_writers:
ocsf_output = OCSFComplianceOutput(
findings=finding_outputs,
framework=fw,
file_path=ocsf_path,
provider=provider,
)
generated_outputs["compliance"].append(ocsf_output)
existing_writers[ocsf_path] = ocsf_output
ocsf_output.batch_write_data_to_file()
processed.add(compliance_name)
return processed
def display_compliance_table(
@@ -119,9 +26,6 @@ def display_compliance_table(
output_filename: str,
output_directory: str,
compliance_overview: bool,
universal_frameworks: dict = None,
provider: str = None,
output_formats: list = None,
) -> None:
"""
display_compliance_table generates the compliance table for the given compliance framework.
@@ -133,9 +37,6 @@ def display_compliance_table(
output_filename (str): The output filename
output_directory (str): The output directory
compliance_overview (bool): The compliance
universal_frameworks (dict): Optional universal ComplianceFramework objects
provider (str): The current provider (e.g. "aws") for multi-provider filtering
output_formats (list): The output formats to generate
Returns:
None
@@ -144,33 +45,7 @@ def display_compliance_table(
findings = [f for f in findings if f.check_metadata.CheckID in bulk_checks_metadata]
try:
# Universal path: if the framework has TableConfig, use the universal renderer
if universal_frameworks and compliance_framework in universal_frameworks:
fw = universal_frameworks[compliance_framework]
if fw.outputs and fw.outputs.table_config:
get_universal_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
framework=fw,
provider=provider,
output_formats=output_formats,
)
return
if compliance_framework.startswith("cis_"):
get_cis_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("ens_"):
if "ens_" in compliance_framework:
get_ens_table(
findings,
bulk_checks_metadata,
@@ -179,7 +54,16 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("mitre_attack"):
elif "cis_" in compliance_framework:
get_cis_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
elif "mitre_attack" in compliance_framework:
get_mitre_attack_table(
findings,
bulk_checks_metadata,
@@ -188,7 +72,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("kisa"):
elif "kisa_isms_" in compliance_framework:
get_kisa_ismsp_table(
findings,
bulk_checks_metadata,
@@ -197,7 +81,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("prowler_threatscore_"):
elif "threatscore_" in compliance_framework:
get_prowler_threatscore_table(
findings,
bulk_checks_metadata,
@@ -206,7 +90,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("csa_ccm_"):
elif "csa_ccm_" in compliance_framework:
get_csa_table(
findings,
bulk_checks_metadata,
@@ -215,7 +99,7 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif compliance_framework.startswith("c5_"):
elif "c5_" in compliance_framework:
get_c5_table(
findings,
bulk_checks_metadata,
@@ -233,15 +117,6 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "essential_eight" in compliance_framework:
get_essential_eight_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
else:
get_generic_compliance_table(
findings,
@@ -256,3 +131,49 @@ def display_compliance_table(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
sys.exit(1)
# TODO: this should be in the Check class
def get_check_compliance(
finding: Check_Report, provider_type: str, bulk_checks_metadata: dict
) -> dict:
"""get_check_compliance returns a map with the compliance framework as key and the requirements where the finding's check is present.
Example:
{
"CIS-1.4": ["2.1.3"],
"CIS-1.5": ["2.1.3"],
}
Args:
finding (Any): The Check_Report finding
provider_type (str): The provider type
bulk_checks_metadata (dict): The bulk checks metadata
Returns:
dict: The compliance framework as key and the requirements where the finding's check is present.
"""
try:
check_compliance = {}
# We have to retrieve all the check's compliance requirements
if finding.check_metadata.CheckID in bulk_checks_metadata:
for compliance in bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
# compliance.Provider == "Azure" or "Kubernetes"
# provider_type == "azure" or "kubernetes"
if compliance.Provider.upper() == provider_type.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return {}
@@ -1,48 +0,0 @@
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
# TODO: this should be in the Check class
def get_check_compliance(
finding: Check_Report, provider_type: str, bulk_checks_metadata: dict
) -> dict:
"""get_check_compliance returns a map with the compliance framework as key and the requirements where the finding's check is present.
Example:
{
"CIS-1.4": ["2.1.3"],
"CIS-1.5": ["2.1.3"],
}
Args:
finding (Any): The Check_Report finding
provider_type (str): The provider type
bulk_checks_metadata (dict): The bulk checks metadata
Returns:
dict: The compliance framework as key and the requirements where the finding's check is present.
"""
try:
check_compliance = {}
# We have to retrieve all the check's compliance requirements
if finding.check_metadata.CheckID in bulk_checks_metadata:
for compliance in bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
# compliance.Provider == "Azure" or "Kubernetes"
# provider_type == "azure" or "kubernetes"
if compliance.Provider.upper() == provider_type.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return {}
@@ -1,98 +0,0 @@
from colorama import Fore, Style
from tabulate import tabulate
from prowler.config.config import orange_color
def get_essential_eight_table(
findings: list,
bulk_checks_metadata: dict,
compliance_framework: str,
output_filename: str,
output_directory: str,
compliance_overview: bool,
):
sections = {}
essential_eight_compliance_table = {
"Provider": [],
"Section": [],
"Status": [],
"Muted": [],
}
pass_count = []
fail_count = []
muted_count = []
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "Essential-Eight":
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
if section not in sections:
sections[section] = {
"FAIL": 0,
"PASS": 0,
"Muted": 0,
}
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
essential_eight_compliance_table["Provider"].append(compliance.Provider)
essential_eight_compliance_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
essential_eight_compliance_table["Status"].append(
f"{Fore.RED}FAIL({sections[section]['FAIL']}){Style.RESET_ALL}"
)
elif sections[section]["PASS"] > 0:
essential_eight_compliance_table["Status"].append(
f"{Fore.GREEN}PASS({sections[section]['PASS']}){Style.RESET_ALL}"
)
else:
essential_eight_compliance_table["Status"].append("-")
essential_eight_compliance_table["Muted"].append(
f"{orange_color}{sections[section]['Muted']}{Style.RESET_ALL}"
)
if len(fail_count) + len(pass_count) + len(muted_count) > 1:
print(
f"\nCompliance Status of {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Framework:"
)
total_findings_count = len(fail_count) + len(pass_count) + len(muted_count)
overview_table = [
[
f"{Fore.RED}{round(len(fail_count) / total_findings_count * 100, 2)}% ({len(fail_count)}) FAIL{Style.RESET_ALL}",
f"{Fore.GREEN}{round(len(pass_count) / total_findings_count * 100, 2)}% ({len(pass_count)}) PASS{Style.RESET_ALL}",
f"{orange_color}{round(len(muted_count) / total_findings_count * 100, 2)}% ({len(muted_count)}) MUTED{Style.RESET_ALL}",
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
if not compliance_overview:
print(
f"\nFramework {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Results:"
)
print(
tabulate(
essential_eight_compliance_table,
headers="keys",
tablefmt="rounded_grid",
)
)
print(
f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}"
)
print(f"\nDetailed results of {compliance_framework.upper()} are in:")
print(
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n"
)
@@ -1,111 +0,0 @@
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.essential_eight.models import (
EssentialEightAWSModel,
)
from prowler.lib.outputs.finding import Finding
class EssentialEightAWS(ComplianceOutput):
"""
This class represents the AWS ASD Essential Eight compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into AWS Essential Eight compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS Essential Eight compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_MaturityLevel=attribute.MaturityLevel,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_CloudApplicability=attribute.CloudApplicability,
Requirements_Attributes_MitigatedThreats=", ".join(
attribute.MitigatedThreats
),
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
AccountId="",
Region="",
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_MaturityLevel=attribute.MaturityLevel,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_CloudApplicability=attribute.CloudApplicability,
Requirements_Attributes_MitigatedThreats=", ".join(
attribute.MitigatedThreats
),
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
@@ -1,35 +0,0 @@
from pydantic.v1 import BaseModel
class EssentialEightAWSModel(BaseModel):
"""
EssentialEightAWSModel generates a finding's output in AWS ASD Essential Eight Compliance format.
"""
Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_Section: str
Requirements_Attributes_MaturityLevel: str
Requirements_Attributes_AssessmentStatus: str
Requirements_Attributes_CloudApplicability: str
Requirements_Attributes_MitigatedThreats: str
Requirements_Attributes_Description: str
Requirements_Attributes_RationaleStatement: str
Requirements_Attributes_ImpactStatement: str
Requirements_Attributes_RemediationProcedure: str
Requirements_Attributes_AuditProcedure: str
Requirements_Attributes_AdditionalInformation: str
Requirements_Attributes_References: str
Status: str
StatusExtended: str
ResourceId: str
ResourceName: str
CheckId: str
Muted: bool
Framework: str
Name: str
@@ -1,7 +1,6 @@
import json
import os
from datetime import datetime
from typing import TYPE_CHECKING, List
from typing import List
from py_ocsf_models.events.base_event import SeverityID
from py_ocsf_models.events.base_event import StatusID as EventStatusID
@@ -21,12 +20,11 @@ from py_ocsf_models.objects.resource_details import ResourceDetails
from prowler.config.config import prowler_version
from prowler.lib.check.compliance_models import ComplianceFramework
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.ocsf.ocsf import OCSF
from prowler.lib.outputs.utils import unroll_dict_to_list
from prowler.lib.utils.utils import open_file
if TYPE_CHECKING:
from prowler.lib.outputs.finding import Finding
PROWLER_TO_COMPLIANCE_STATUS = {
"PASS": ComplianceStatusID.Pass,
"FAIL": ComplianceStatusID.Fail,
@@ -34,40 +32,6 @@ PROWLER_TO_COMPLIANCE_STATUS = {
}
def _sanitize_resource_data(resource_details, resource_metadata) -> dict:
"""Ensure resource data is JSON-serializable.
Service resource_metadata may carry non-serializable objects (e.g. raw
Pydantic models or service classes such as ``Trail`` / ``LifecyclePolicy``).
Convert them to plain dicts and roundtrip through JSON so the resulting
ComplianceFinding can be serialized without errors.
"""
def _make_serializable(obj):
if hasattr(obj, "model_dump") and callable(obj.model_dump):
return _make_serializable(obj.model_dump())
if hasattr(obj, "dict") and callable(obj.dict):
return _make_serializable(obj.dict())
if isinstance(obj, dict):
return {str(k): _make_serializable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_make_serializable(v) for v in obj]
return obj
try:
converted = _make_serializable(resource_metadata)
sanitized_metadata = json.loads(json.dumps(converted, default=str))
except (TypeError, ValueError, RecursionError) as error:
logger.warning(
f"Failed to serialize resource metadata, defaulting to empty: {error}"
)
sanitized_metadata = {}
return {
"details": resource_details,
"metadata": sanitized_metadata,
}
def _to_snake_case(name: str) -> str:
"""Convert a PascalCase or camelCase string to snake_case."""
import re
@@ -144,7 +108,7 @@ class OCSFComplianceOutput:
def _transform(
self,
findings: List["Finding"],
findings: List[Finding],
framework: ComplianceFramework,
compliance_name: str,
) -> None:
@@ -213,7 +177,7 @@ class OCSFComplianceOutput:
def _build_compliance_finding(
self,
finding: "Finding",
finding: Finding,
framework: ComplianceFramework,
requirement,
compliance_name: str,
@@ -231,9 +195,7 @@ class OCSFComplianceOutput:
finding.metadata.Severity.capitalize(),
SeverityID.Unknown,
)
event_status = (
EventStatusID.Suppressed if finding.muted else EventStatusID.New
)
event_status = OCSF.get_finding_status_id(finding.muted)
time_value = (
int(finding.timestamp.timestamp())
@@ -306,10 +268,10 @@ class OCSFComplianceOutput:
if finding.provider == "kubernetes"
else None
),
data=_sanitize_resource_data(
finding.resource_details,
finding.resource_metadata,
),
data={
"details": finding.resource_details,
"metadata": finding.resource_metadata,
},
)
],
severity_id=finding_severity.value,
@@ -1,294 +0,0 @@
from csv import DictWriter
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from pydantic.v1 import create_model
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import ComplianceFramework
from prowler.lib.logger import logger
from prowler.lib.utils.utils import open_file
if TYPE_CHECKING:
from prowler.lib.outputs.finding import Finding
PROVIDER_HEADER_MAP = {
"aws": ("AccountId", "account_uid", "Region", "region"),
"azure": ("SubscriptionId", "account_uid", "Location", "region"),
"gcp": ("ProjectId", "account_uid", "Location", "region"),
"kubernetes": ("Context", "account_name", "Namespace", "region"),
"m365": ("TenantId", "account_uid", "Location", "region"),
"github": ("Account_Name", "account_name", "Account_Id", "account_uid"),
"oraclecloud": ("TenancyId", "account_uid", "Region", "region"),
"alibabacloud": ("AccountId", "account_uid", "Region", "region"),
"nhn": ("AccountId", "account_uid", "Region", "region"),
}
_DEFAULT_HEADERS = ("AccountId", "account_uid", "Region", "region")
class UniversalComplianceOutput:
"""Universal compliance CSV output driven by ComplianceFramework metadata.
Dynamically builds a Pydantic row model from attributes_metadata so that
CSV columns match the framework's declared attribute fields.
"""
def __init__(
self,
findings: list,
framework: ComplianceFramework,
file_path: str = None,
from_cli: bool = True,
provider: str = None,
) -> None:
self._data = []
self._file_descriptor = None
self.file_path = file_path
self._from_cli = from_cli
self._provider = provider
self.close_file = False
if file_path:
path_obj = Path(file_path)
self._file_extension = path_obj.suffix if path_obj.suffix else ""
if findings:
self._row_model = self._build_row_model(framework)
compliance_name = (
framework.framework + "-" + framework.version
if framework.version
else framework.framework
)
self._transform(findings, framework, compliance_name)
if not self._file_descriptor and file_path:
self._create_file_descriptor(file_path)
@property
def data(self):
return self._data
def _build_row_model(self, framework: ComplianceFramework):
"""Build a dynamic Pydantic model from attributes_metadata."""
acct_header, acct_field, loc_header, loc_field = PROVIDER_HEADER_MAP.get(
(self._provider or "").lower(), _DEFAULT_HEADERS
)
self._acct_header = acct_header
self._acct_field = acct_field
self._loc_header = loc_header
self._loc_field = loc_field
# Base fields present in every compliance CSV
fields = {
"Provider": (str, ...),
"Description": (str, ...),
acct_header: (str, ...),
loc_header: (str, ...),
"AssessmentDate": (str, ...),
"Requirements_Id": (str, ...),
"Requirements_Description": (str, ...),
}
# Dynamic attribute columns from metadata
if framework.attributes_metadata:
for attr_meta in framework.attributes_metadata:
if not attr_meta.output_formats.csv:
continue
field_name = f"Requirements_Attributes_{attr_meta.key}"
# Map type strings to Python types
type_map = {
"str": Optional[str],
"int": Optional[int],
"float": Optional[float],
"bool": Optional[bool],
"list_str": Optional[str], # Serialized as joined string
"list_dict": Optional[str], # Serialized as string
}
py_type = type_map.get(attr_meta.type, Optional[str])
fields[field_name] = (py_type, None)
# Check if any requirement has MITRE fields
has_mitre = any(req.tactics for req in framework.requirements if req.tactics)
if has_mitre:
fields["Requirements_Tactics"] = (Optional[str], None)
fields["Requirements_SubTechniques"] = (Optional[str], None)
fields["Requirements_Platforms"] = (Optional[str], None)
fields["Requirements_TechniqueURL"] = (Optional[str], None)
# Trailing fields
fields["Status"] = (str, ...)
fields["StatusExtended"] = (str, ...)
fields["ResourceId"] = (str, ...)
fields["ResourceName"] = (str, ...)
fields["CheckId"] = (str, ...)
fields["Muted"] = (bool, ...)
fields["Framework"] = (str, ...)
fields["Name"] = (str, ...)
return create_model("UniversalComplianceRow", **fields)
def _serialize_attr_value(self, value):
"""Serialize attribute values for CSV."""
if isinstance(value, list):
if value and isinstance(value[0], dict):
return str(value)
return " | ".join(str(v) for v in value)
return value
def _build_row(self, finding, framework, requirement, is_manual=False):
"""Build a single row dict for a finding + requirement combination."""
row = {
"Provider": (
finding.provider
if not is_manual
else (framework.provider or self._provider or "").lower()
),
"Description": framework.description,
self._acct_header: (
getattr(finding, self._acct_field, "") if not is_manual else ""
),
self._loc_header: (
getattr(finding, self._loc_field, "") if not is_manual else ""
),
"AssessmentDate": str(timestamp),
"Requirements_Id": requirement.id,
"Requirements_Description": requirement.description,
}
# Add dynamic attribute columns
if framework.attributes_metadata:
for attr_meta in framework.attributes_metadata:
if not attr_meta.output_formats.csv:
continue
field_name = f"Requirements_Attributes_{attr_meta.key}"
raw_val = requirement.attributes.get(attr_meta.key)
row[field_name] = (
self._serialize_attr_value(raw_val) if raw_val is not None else None
)
# MITRE fields
if requirement.tactics:
row["Requirements_Tactics"] = (
" | ".join(requirement.tactics) if requirement.tactics else None
)
row["Requirements_SubTechniques"] = (
" | ".join(requirement.sub_techniques)
if requirement.sub_techniques
else None
)
row["Requirements_Platforms"] = (
" | ".join(requirement.platforms) if requirement.platforms else None
)
row["Requirements_TechniqueURL"] = requirement.technique_url
row["Status"] = finding.status if not is_manual else "MANUAL"
row["StatusExtended"] = (
finding.status_extended if not is_manual else "Manual check"
)
row["ResourceId"] = finding.resource_uid if not is_manual else "manual_check"
row["ResourceName"] = finding.resource_name if not is_manual else "Manual check"
row["CheckId"] = finding.check_id if not is_manual else "manual"
row["Muted"] = finding.muted if not is_manual else False
row["Framework"] = framework.framework
row["Name"] = framework.name
return row
def _transform(
self,
findings: list["Finding"],
framework: ComplianceFramework,
compliance_name: str,
) -> None:
"""Transform findings into universal compliance CSV rows."""
# Build check -> requirements map (filtered by provider for dict checks)
check_req_map = {}
for req in framework.requirements:
checks = req.checks
if self._provider:
all_checks = checks.get(self._provider.lower(), [])
else:
all_checks = []
for check_list in checks.values():
all_checks.extend(check_list)
for check_id in all_checks:
if check_id not in check_req_map:
check_req_map[check_id] = []
check_req_map[check_id].append(req)
# Process findings using the provider-filtered check_req_map.
# This ensures that for multi-provider dict checks, only the checks
# belonging to the current provider produce output rows.
for finding in findings:
check_id = finding.check_id
if check_id in check_req_map:
for req in check_req_map[check_id]:
row = self._build_row(finding, framework, req)
try:
self._data.append(self._row_model(**row))
except Exception as e:
logger.debug(f"Skipping row for {req.id}: {e}")
# Manual requirements (no checks or empty dict)
for req in framework.requirements:
checks = req.checks
if self._provider:
has_checks = bool(checks.get(self._provider.lower(), []))
else:
has_checks = any(checks.values())
if not has_checks:
# Use a dummy finding-like namespace for manual rows
row = self._build_row(
_ManualFindingStub(), framework, req, is_manual=True
)
try:
self._data.append(self._row_model(**row))
except Exception as e:
logger.debug(f"Skipping manual row for {req.id}: {e}")
def _create_file_descriptor(self, file_path: str) -> None:
try:
self._file_descriptor = open_file(file_path, "a")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def batch_write_data_to_file(self) -> None:
"""Write findings data to CSV."""
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
csv_writer = DictWriter(
self._file_descriptor,
fieldnames=[field.upper() for field in self._data[0].dict().keys()],
delimiter=";",
)
if self._file_descriptor.tell() == 0:
csv_writer.writeheader()
for row in self._data:
csv_writer.writerow({k.upper(): v for k, v in row.dict().items()})
if self.close_file or self._from_cli:
self._file_descriptor.close()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class _ManualFindingStub:
"""Minimal stub to satisfy _build_row for manual requirements."""
provider = ""
account_uid = ""
account_name = ""
region = ""
status = "MANUAL"
status_extended = "Manual check"
resource_uid = "manual_check"
resource_name = "Manual check"
check_id = "manual"
muted = False
+1 -1
View File
@@ -15,7 +15,7 @@ from prowler.lib.check.models import (
)
from prowler.lib.logger import logger
from prowler.lib.outputs.common import Status, fill_common_finding_data
from prowler.lib.outputs.compliance.compliance_check import get_check_compliance
from prowler.lib.outputs.compliance.compliance import get_check_compliance
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.utils.utils import dict_to_lowercase, get_nested_attribute
from prowler.providers.common.provider import Provider
+18 -37
View File
@@ -25,8 +25,8 @@ from prowler.lib.utils.utils import open_file, parse_json_file, print_boxes
from prowler.providers.aws.config import (
AWS_REGION_US_EAST_1,
AWS_STS_GLOBAL_ENDPOINT_REGION,
BOTO3_USER_AGENT_EXTRA,
ROLE_SESSION_NAME,
get_default_session_config,
)
from prowler.providers.aws.exceptions.exceptions import (
AWSAccessKeyIDInvalidError,
@@ -227,15 +227,14 @@ class AwsProvider(Provider):
# TODO: Use AwsSetUpSession ?????
# Configure the initial AWS Session using the local credentials: profile or environment variables
session_config = self.set_session_config(retries_max_attempts)
aws_session = self.setup_session(
mfa=mfa,
profile=profile,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
session_config=session_config,
)
session_config = self.set_session_config(retries_max_attempts)
# Current session and the original session points to the same session object until we get a new one, if needed
self._session = AWSSession(
current_session=aws_session,
@@ -631,7 +630,6 @@ class AwsProvider(Provider):
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
aws_session_token: Optional[str] = None,
session_config: Optional[Config] = None,
) -> Session:
"""
setup_session sets up an AWS session using the provided credentials.
@@ -642,9 +640,6 @@ class AwsProvider(Provider):
- aws_access_key_id: The AWS access key ID.
- aws_secret_access_key: The AWS secret access key.
- aws_session_token: The AWS session token, optional.
- session_config: Botocore Config applied as the session's default
client config so every client created from the session inherits
the Prowler user agent and retry settings.
Returns:
- Session: The AWS session.
@@ -655,9 +650,6 @@ class AwsProvider(Provider):
try:
logger.debug("Creating original session ...")
if session_config is None:
session_config = AwsProvider.set_session_config(None)
session_arguments = {}
if profile:
session_arguments["profile_name"] = profile
@@ -669,7 +661,6 @@ class AwsProvider(Provider):
if mfa:
session = Session(**session_arguments)
session._session.set_default_client_config(session_config)
sts_client = session.client("sts")
# TODO: pass values from the input
@@ -682,7 +673,7 @@ class AwsProvider(Provider):
session_credentials = sts_client.get_session_token(
**get_session_token_arguments
)
mfa_session = Session(
return Session(
aws_access_key_id=session_credentials["Credentials"]["AccessKeyId"],
aws_secret_access_key=session_credentials["Credentials"][
"SecretAccessKey"
@@ -691,12 +682,8 @@ class AwsProvider(Provider):
"SessionToken"
],
)
mfa_session._session.set_default_client_config(session_config)
return mfa_session
else:
session = Session(**session_arguments)
session._session.set_default_client_config(session_config)
return session
return Session(**session_arguments)
except Exception as error:
logger.critical(
f"AWSSetUpSessionError[{error.__traceback__.tb_lineno}]: {error}"
@@ -711,7 +698,6 @@ class AwsProvider(Provider):
identity: AWSIdentityInfo,
assumed_role_configuration: AWSAssumeRoleConfiguration,
session: AWSSession,
session_config: Optional[Config] = None,
) -> Session:
"""
Sets up an assumed session using the provided assumed role credentials.
@@ -756,13 +742,6 @@ class AwsProvider(Provider):
assumed_session = BotocoreSession()
assumed_session._credentials = assumed_refreshable_credentials
assumed_session.set_config_variable("region", identity.profile_region)
if session_config is None:
session_config = (
session.session_config
if session is not None
else AwsProvider.set_session_config(None)
)
assumed_session.set_default_client_config(session_config)
return Session(
profile_name=identity.profile,
botocore_session=assumed_session,
@@ -891,7 +870,7 @@ class AwsProvider(Provider):
for region in enabled_regions:
regional_client = self._session.current_session.client(
service, region_name=region
service, region_name=region, config=self._session.session_config
)
regional_client.region = region
regional_clients[region] = regional_client
@@ -1161,16 +1140,21 @@ class AwsProvider(Provider):
Returns:
- Config: The botocore Config object
"""
default_session_config = get_default_session_config()
# Set the maximum retries for the standard retrier config
default_session_config = Config(
retries={"max_attempts": 3, "mode": "standard"},
user_agent_extra=BOTO3_USER_AGENT_EXTRA,
)
if retries_max_attempts:
default_session_config = default_session_config.merge(
Config(
retries={
"max_attempts": retries_max_attempts,
"mode": "standard",
},
)
# Create the new config
config = Config(
retries={
"max_attempts": retries_max_attempts,
"mode": "standard",
},
)
# Merge the new configuration
default_session_config = default_session_config.merge(config)
return default_session_config
@@ -1441,9 +1425,6 @@ class AwsProvider(Provider):
region_name=aws_region,
profile_name=profile,
)
session._session.set_default_client_config(
AwsProvider.set_session_config(None)
)
caller_identity = AwsProvider.validate_credentials(session, aws_region)
# Do an extra validation if the AWS account ID is provided
-9
View File
@@ -1,15 +1,6 @@
import os
from botocore.config import Config
AWS_STS_GLOBAL_ENDPOINT_REGION = "us-east-1"
AWS_REGION_US_EAST_1 = "us-east-1"
BOTO3_USER_AGENT_EXTRA = os.getenv("PROWLER_AWS_BOTO3_USER_AGENT_EXTRA", "APN_1826889")
ROLE_SESSION_NAME = "ProwlerAssessmentSession"
def get_default_session_config() -> Config:
return Config(
user_agent_extra=BOTO3_USER_AGENT_EXTRA,
retries={"max_attempts": 3, "mode": "standard"},
)
@@ -56,7 +56,9 @@ def quick_inventory(provider: AwsProvider, args):
try:
# Scan IAM only once
if not iam_was_scanned:
global_resources.extend(get_iam_resources(provider))
global_resources.extend(
get_iam_resources(provider.session.current_session)
)
iam_was_scanned = True
# Get regional S3 buckets since none-tagged buckets are not supported by the resourcegroupstaggingapi
@@ -310,8 +312,8 @@ def create_output(resources: list, provider: AwsProvider, args):
if args.output_bucket:
output_bucket = args.output_bucket
bucket_session = provider.session.current_session
# The outer condition guarantees -D was input when -B was not
else:
# Check if -D was input
elif args.output_bucket_no_assume:
output_bucket = args.output_bucket_no_assume
bucket_session = provider.session.original_session
@@ -373,9 +375,9 @@ def get_regional_buckets(provider: AwsProvider, region: str) -> list:
return regional_buckets
def get_iam_resources(provider: AwsProvider) -> list:
def get_iam_resources(session) -> list:
iam_resources = []
iam_client = provider.session.current_session.client("iam")
iam_client = session.client("iam")
try:
get_roles_paginator = iam_client.get_paginator("list_roles")
for page in get_roles_paginator.paginate():
+2 -11
View File
@@ -111,13 +111,6 @@ class S3:
- None
"""
if session:
# Preserve the caller's existing default config (and the
# retries_max_attempts already baked into it) instead of clobbering
# it with a freshly built one.
if session._session.get_default_client_config() is None:
session._session.set_default_client_config(
AwsProvider.set_session_config(retries_max_attempts)
)
self._session = session.client(__class__.__name__.lower())
else:
aws_setup_session = AwsSetUpSession(
@@ -134,7 +127,8 @@ class S3:
regions=regions,
)
self._session = aws_setup_session._session.current_session.client(
__class__.__name__.lower()
__class__.__name__.lower(),
config=aws_setup_session._session.session_config,
)
self._bucket_name = bucket_name
@@ -319,9 +313,6 @@ class S3:
region_name=aws_region,
profile_name=profile,
)
session._session.set_default_client_config(
AwsProvider.set_session_config(None)
)
s3_client = session.client(__class__.__name__.lower())
if "s3://" in bucket_name:
bucket_name = bucket_name.removeprefix("s3://")
@@ -148,13 +148,6 @@ class SecurityHub:
regions=regions,
)
self._session = aws_setup_session._session.current_session
# Only install the Prowler default config when the caller-supplied
# session does not already carry one — overwriting would drop the
# provider's retries_max_attempts value.
if aws_session and self._session._session.get_default_client_config() is None:
self._session._session.set_default_client_config(
AwsProvider.set_session_config(retries_max_attempts)
)
self._aws_account_id = aws_account_id
if not aws_partition:
aws_partition = AwsProvider.validate_credentials(
@@ -242,7 +235,7 @@ class SecurityHub:
Args:
region (str): AWS region to check.
session (Session): AWS session object. Expected to carry the Prowler default client config.
session (Session): AWS session object.
aws_account_id (str): AWS account ID.
aws_partition (str): AWS partition.
@@ -547,9 +540,6 @@ class SecurityHub:
region_name=aws_region,
profile_name=profile,
)
session._session.set_default_client_config(
AwsProvider.set_session_config(None)
)
all_regions = AwsProvider.get_available_aws_service_regions(
service="securityhub", partition=aws_partition
+2 -8
View File
@@ -32,13 +32,7 @@ class AWSService:
def is_failed_check(cls, check_id, arn):
return (check_id.split(".")[-1], arn) in cls.failed_checks
def __init__(
self,
service: str,
provider: AwsProvider,
global_service=False,
region: str = None,
):
def __init__(self, service: str, provider: AwsProvider, global_service=False):
# Audit Information
# Do we need to store the whole provider?
self.provider = provider
@@ -67,7 +61,7 @@ class AWSService:
# Get a single region and client if the service needs it (e.g. AWS Global Service)
# We cannot include this within an else because some services needs both the regional_clients
# and a single client like S3
self.region = region or provider.get_default_region(
self.region = provider.get_default_region(
self.service, global_service=global_service
)
self.client = self.session.client(self.service, self.region)
@@ -73,15 +73,15 @@ class AwsSetUpSession:
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
session_config = AwsProvider.set_session_config(retries_max_attempts)
# Setup the AWS session
aws_session = AwsProvider.setup_session(
mfa=mfa,
profile=profile,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
session_config=session_config,
)
session_config = AwsProvider.set_session_config(retries_max_attempts)
self._session = AWSSession(
current_session=aws_session,
session_config=session_config,
@@ -1,44 +0,0 @@
{
"Provider": "aws",
"CheckID": "bedrock_guardrails_configured",
"CheckTitle": "Bedrock has at least one guardrail configured in the audited region",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/AWS Security Best Practices/Runtime Behavior Analysis"
],
"ServiceName": "bedrock",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Other",
"ResourceGroup": "ai_ml",
"Description": "**Amazon Bedrock guardrails** provide reusable safety policies for filtering harmful or unwanted content in model inputs and outputs.\n\nThis evaluation checks whether at least one guardrail exists in each successfully scanned region. It does **not** verify that guardrails are attached to agents or passed on individual model invocation API calls.",
"Risk": "Without any configured **Bedrock guardrails** in a region, teams lack a native reusable policy object for **content filtering** and **safety controls**. Applications may invoke models without standardized protections against **harmful content**, **prompt injection**, or **sensitive-data exposure** unless equivalent controls are enforced elsewhere.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails.html",
"https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails-create.html"
],
"Remediation": {
"Code": {
"CLI": "aws bedrock create-guardrail --name example_resource --blocked-input-messaging 'Blocked' --blocked-outputs-messaging 'Blocked' --content-policy-config 'filtersConfig=[{type=HATE,inputStrength=HIGH,outputStrength=HIGH}]'",
"NativeIaC": "```yaml\nResources:\n example_resource:\n Type: AWS::Bedrock::Guardrail\n Properties:\n Name: example_resource\n BlockedInputMessaging: \"Blocked\"\n BlockedOutputsMessaging: \"Blocked\"\n ContentPolicyConfig:\n FiltersConfig:\n - Type: HATE\n InputStrength: HIGH # Critical: configures content filtering\n OutputStrength: HIGH\n```",
"Other": "1. Open the AWS Console and go to Amazon Bedrock\n2. Select **Guardrails** from the navigation pane\n3. Click **Create guardrail**\n4. Configure content filters for harmful categories\n5. Set input and output messaging for blocked content\n6. Click **Create guardrail**",
"Terraform": "```hcl\nresource \"aws_bedrock_guardrail\" \"example_resource\" {\n name = \"example_resource\"\n blocked_input_messaging = \"Blocked\"\n blocked_outputs_messaging = \"Blocked\"\n\n content_policy_config {\n filters_config {\n type = \"HATE\" # Critical: configures content filtering\n input_strength = \"HIGH\"\n output_strength = \"HIGH\"\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Create at least one **Bedrock guardrail** in each region where Bedrock is used, then separately ensure those guardrails are attached to relevant agents and invocation paths.\n- Configure **content filters** for harmful categories (hate, violence, sexual, misconduct)\n- Add **sensitive information filters** and **denied topic policies**\n- Apply guardrails at the API call level using `guardrailIdentifier` where supported",
"Url": "https://hub.prowler.com/check/bedrock_guardrails_configured"
}
},
"Categories": [
"gen-ai"
],
"DependsOn": [],
"RelatedTo": [
"bedrock_guardrail_prompt_attack_filter_enabled",
"bedrock_guardrail_sensitive_information_filter_enabled",
"bedrock_agent_guardrail_enabled"
],
"Notes": "This check validates guardrail existence per successfully scanned region. It does not verify attachment to agents or the use of guardrails on model invocations. Regions where Bedrock guardrails cannot be enumerated are skipped to avoid false failures."
}
@@ -1,50 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.bedrock.bedrock_client import bedrock_client
class bedrock_guardrails_configured(Check):
"""Ensure Bedrock guardrails are configured in successfully scanned regions.
This check verifies that at least one Amazon Bedrock guardrail is configured
in each successfully scanned region.
- PASS: At least one Bedrock guardrail is configured in the region.
- FAIL: No Bedrock guardrails are configured in the region.
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for region in sorted(bedrock_client.guardrails_scanned_regions):
regional_guardrails = sorted(
(
guardrail
for guardrail in bedrock_client.guardrails.values()
if guardrail.region == region
),
key=lambda guardrail: guardrail.name,
)
if regional_guardrails:
for guardrail in regional_guardrails:
report = Check_Report_AWS(
metadata=self.metadata(), resource=guardrail
)
report.status = "PASS"
report.status_extended = f"Bedrock guardrail {guardrail.name} is available in region {region}. This does not confirm that the guardrail is attached to agents or used on model invocations."
findings.append(report)
else:
report = Check_Report_AWS(metadata=self.metadata(), resource={})
report.region = region
report.resource_id = "bedrock-guardrails"
report.resource_arn = f"arn:{bedrock_client.audited_partition}:bedrock:{region}:{bedrock_client.audited_account}:guardrails"
report.status = "FAIL"
report.status_extended = (
f"Bedrock has no guardrails configured in region {region}."
)
findings.append(report)
return findings
@@ -1,39 +0,0 @@
{
"Provider": "aws",
"CheckID": "bedrock_prompt_management_exists",
"CheckTitle": "Amazon Bedrock Prompt Management prompts exist in the region",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "bedrock",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Other",
"ResourceGroup": "ai_ml",
"Description": "**Bedrock Prompt Management** enables centralized creation, versioning, and governance of prompts used with foundation models.\n\nThis region-level check verifies whether at least one managed prompt exists in each scanned region, used as an adoption signal for Prompt Management. The presence of a prompt does not by itself guarantee that every application prompt is managed.",
"Risk": "Without **Prompt Management**, prompts are scattered across applications with no central oversight, versioning, or auditability over instructions sent to foundation models, weakening governance and compliance posture.\n\nManaged prompts are a governance enabler; **prompt injection** defenses are provided by Bedrock **guardrails**, covered by separate checks.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-management.html",
"https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-management-create.html"
],
"Remediation": {
"Code": {
"CLI": "aws bedrock-agent create-prompt --name example_prompt --default-variant default --variants '[{\"name\":\"default\",\"templateType\":\"TEXT\",\"templateConfiguration\":{\"text\":{\"text\":\"Your prompt template here.\"}}}]'",
"NativeIaC": "",
"Other": "1. Open the Amazon Bedrock console\n2. Navigate to Prompt Management\n3. Click Create prompt\n4. Provide a name and configure the prompt template (a prompt can contain at most one variant; additional variants are created via CreatePromptVersion)\n5. Save the prompt",
"Terraform": ""
},
"Recommendation": {
"Text": "Adopt **Bedrock Prompt Management** to centralize prompt definitions, enforce versioning, and maintain governance over model interactions.\n\nUse managed prompts with **guardrails** and apply **least privilege** access controls to restrict who can create or modify prompts.",
"Url": "https://hub.prowler.com/check/bedrock_prompt_management_exists"
}
},
"Categories": [
"gen-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Results are generated per scanned region. Regions where `ListPrompts` cannot be queried are omitted from the findings."
}
@@ -1,54 +0,0 @@
"""Check for region-level Bedrock Prompt Management adoption."""
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.bedrock.bedrock_agent_client import (
bedrock_agent_client,
)
class bedrock_prompt_management_exists(Check):
"""Check whether Amazon Bedrock Prompt Management prompts exist in the region.
A region is reported only when ListPrompts succeeded for it; regions where
the API call failed (e.g. AccessDenied, unsupported region) are skipped at
the service layer and produce no finding.
- PASS: At least one managed prompt exists in the region (one finding per prompt).
- FAIL: No managed prompts exist in the region (one finding per region).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the Bedrock Prompt Management exists check.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for region in sorted(bedrock_agent_client.prompt_scanned_regions):
regional_prompts = sorted(
(
prompt
for prompt in bedrock_agent_client.prompts.values()
if prompt.region == region
),
key=lambda prompt: prompt.name,
)
if regional_prompts:
for prompt in regional_prompts:
report = Check_Report_AWS(metadata=self.metadata(), resource=prompt)
report.status = "PASS"
report.status_extended = f"Bedrock Prompt Management prompt {prompt.name} exists in region {region}."
findings.append(report)
else:
report = Check_Report_AWS(metadata=self.metadata(), resource={})
report.region = region
report.resource_id = "prompt-management"
report.resource_arn = f"arn:{bedrock_agent_client.audited_partition}:bedrock:{region}:{bedrock_agent_client.audited_account}:prompt-management"
report.status = "FAIL"
report.status_extended = (
f"No Bedrock Prompt Management prompts exist in region {region}."
)
findings.append(report)
return findings
@@ -1,6 +1,5 @@
from typing import Optional
from botocore.exceptions import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
@@ -14,8 +13,6 @@ class Bedrock(AWSService):
super().__init__(__class__.__name__, provider)
self.logging_configurations = {}
self.guardrails = {}
self.guardrails_scanned_regions = set()
self.guardrails_scan_errors = {}
self.__threading_call__(self._get_model_invocation_logging_configuration)
self.__threading_call__(self._list_guardrails)
self.__threading_call__(self._get_guardrail, self.guardrails.values())
@@ -70,18 +67,7 @@ class Bedrock(AWSService):
arn=guardrail["arn"],
region=regional_client.region,
)
self.guardrails_scanned_regions.add(regional_client.region)
except ClientError as error:
self.guardrails_scan_errors[regional_client.region] = error.response[
"Error"
].get("Code", error.__class__.__name__)
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self.guardrails_scan_errors[regional_client.region] = (
error.__class__.__name__
)
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@@ -140,10 +126,7 @@ class BedrockAgent(AWSService):
# Call AWSService's __init__
super().__init__("bedrock-agent", provider)
self.agents = {}
self.prompts = {}
self.prompt_scanned_regions: set = set()
self.__threading_call__(self._list_agents)
self.__threading_call__(self._list_prompts)
self.__threading_call__(self._list_tags_for_resource, self.agents.values())
def _list_agents(self, regional_client):
@@ -170,32 +153,7 @@ class BedrockAgent(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_prompts(self, regional_client):
"""List all prompts in a region.
Prompt Management is evaluated as a region-level adoption signal, so
prompt collection is intentionally not filtered by audit_resources.
"""
logger.info("Bedrock Agent - Listing Prompts...")
try:
paginator = regional_client.get_paginator("list_prompts")
for page in paginator.paginate():
for prompt in page.get("promptSummaries", []):
prompt_arn = prompt.get("arn", "")
self.prompts[prompt_arn] = Prompt(
id=prompt.get("id", ""),
name=prompt.get("name", ""),
arn=prompt_arn,
region=regional_client.region,
)
self.prompt_scanned_regions.add(regional_client.region)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_tags_for_resource(self, resource):
"""List tags for a Bedrock Agent resource."""
logger.info("Bedrock Agent - Listing Tags for Resource...")
try:
agent_tags = (
@@ -218,12 +176,3 @@ class Agent(BaseModel):
guardrail_id: Optional[str] = None
region: str
tags: Optional[list] = []
class Prompt(BaseModel):
"""Model representing a Bedrock Prompt Management prompt."""
id: str
name: str
arn: str
region: str
@@ -1,5 +1,4 @@
import datetime
from concurrent.futures import as_completed
from typing import List, Optional
from pydantic.v1 import BaseModel
@@ -15,9 +14,9 @@ class Codebuild(AWSService):
super().__init__(__class__.__name__, provider)
self.projects = {}
self.__threading_call__(self._list_projects)
self.__threading_call__(self._list_builds_for_project)
self.__threading_call__(self._batch_get_builds)
self.__threading_call__(self._batch_get_projects)
self.__threading_call__(self._list_builds_for_project, self.projects.values())
self.__threading_call__(self._batch_get_builds, self.projects.values())
self.__threading_call__(self._batch_get_projects, self.projects.values())
self.report_groups = {}
self.__threading_call__(self._list_report_groups)
self.__threading_call__(
@@ -45,8 +44,10 @@ class Codebuild(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _fetch_project_last_build(self, regional_client, project):
def _list_builds_for_project(self, project):
logger.info("Codebuild - Listing builds...")
try:
regional_client = self.regional_clients[project.region]
build_ids = regional_client.list_builds_for_project(
projectName=project.name
).get("ids", [])
@@ -57,99 +58,28 @@ class Codebuild(AWSService):
f"{project.region}: {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_builds_for_project(self, regional_client):
logger.info("Codebuild - Listing builds...")
try:
regional_projects = [
project
for project in self.projects.values()
if project.region == regional_client.region
]
# list_builds_for_project has no batch API equivalent, so reuse the
# shared thread pool to issue per-project calls in parallel within
# this region — preserving the wall-clock performance of the
# previous implementation.
futures = [
self.thread_pool.submit(
self._fetch_project_last_build, regional_client, project
)
for project in regional_projects
]
for future in as_completed(futures):
try:
future.result()
except Exception:
pass
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _batch_get_builds(self, regional_client):
def _batch_get_builds(self, project):
logger.info("Codebuild - Getting builds...")
try:
# Collect all build IDs for this region
build_id_to_project = {}
for project in self.projects.values():
if (
project.region == regional_client.region
and project.last_build
and project.last_build.id
):
build_id_to_project[project.last_build.id] = project
if not build_id_to_project:
return
build_ids = list(build_id_to_project.keys())
# batch_get_builds supports up to 100 IDs per call
for i in range(0, len(build_ids), 100):
batch = build_ids[i : i + 100]
response = regional_client.batch_get_builds(ids=batch)
for build_info in response.get("builds", []):
build_id = build_info.get("id")
if build_id in build_id_to_project:
end_time = build_info.get("endTime")
if end_time:
build_id_to_project[build_id].last_invoked_time = end_time
if project.last_build and project.last_build.id:
regional_client = self.regional_clients[project.region]
builds_by_id = regional_client.batch_get_builds(
ids=[project.last_build.id]
).get("builds", [])
if len(builds_by_id) > 0:
project.last_invoked_time = builds_by_id[0].get("endTime")
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{regional_client.region}: {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _batch_get_projects(self, regional_client):
def _batch_get_projects(self, project):
logger.info("Codebuild - Getting projects...")
try:
# Collect all project names for this region
regional_projects = {
arn: project
for arn, project in self.projects.items()
if project.region == regional_client.region
}
if not regional_projects:
return
project_names = [project.name for project in regional_projects.values()]
# batch_get_projects supports up to 100 names per call
for i in range(0, len(project_names), 100):
batch = project_names[i : i + 100]
response = regional_client.batch_get_projects(names=batch)
for project_info in response.get("projects", []):
project_arn = project_info.get("arn")
if project_arn in regional_projects:
self._parse_project_info(
regional_projects[project_arn], project_info
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _parse_project_info(self, project, project_info):
try:
regional_client = self.regional_clients[project.region]
project_info = regional_client.batch_get_projects(names=[project.name])[
"projects"
][0]
project.buildspec = project_info["source"].get("buildspec")
if project_info["source"]["type"] != "NO_SOURCE":
project.source = Source(
@@ -9,13 +9,15 @@ from prowler.providers.aws.lib.service.service import AWSService
class GlobalAccelerator(AWSService):
def __init__(self, provider):
# Global Accelerator is a global service that supports endpoints in multiple AWS Regions
# but you must specify the US West (Oregon) Region to create, update, or otherwise work with accelerators.
# That is, for example, specify --region us-west-2 on AWS CLI commands.
region = "us-west-2" if provider.identity.partition == "aws" else None
super().__init__(__class__.__name__, provider, region=region)
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.accelerators = {}
if self.audited_partition == "aws":
# Global Accelerator is a global service that supports endpoints in multiple AWS Regions
# but you must specify the US West (Oregon) Region to create, update, or otherwise work with accelerators.
# That is, for example, specify --region us-west-2 on AWS CLI commands.
self.region = "us-west-2"
self.client = self.session.client(self.service, self.region)
self._list_accelerators()
self.__threading_call__(self._list_tags, self.accelerators.values())
@@ -176,12 +176,14 @@ class RecordSet(BaseModel):
class Route53Domains(AWSService):
def __init__(self, provider):
# Route53Domains is a global service that supports endpoints in multiple AWS Regions
# but you must specify the US East (N. Virginia) Region to create, update, or otherwise work with domains.
region = "us-east-1" if provider.identity.partition == "aws" else None
super().__init__(__class__.__name__, provider, region=region)
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.domains = {}
if self.audited_partition == "aws":
# Route53Domains is a global service that supports endpoints in multiple AWS Regions
# but you must specify the US East (N. Virginia) Region to create, update, or otherwise work with domains.
self.region = "us-east-1"
self.client = self.session.client(self.service, self.region)
self._list_domains()
self._get_domain_detail()
self._list_tags_for_domain()
@@ -9,20 +9,20 @@ from prowler.providers.aws.lib.service.service import AWSService
class TrustedAdvisor(AWSService):
def __init__(self, provider):
# Support API is not available in China Partition
# But only in us-east-1 or us-gov-west-1 https://docs.aws.amazon.com/general/latest/gr/awssupport.html
partition = provider.identity.partition
if partition == "aws":
support_region = "us-east-1"
elif partition == "aws-cn":
support_region = None
else:
support_region = "us-gov-west-1"
super().__init__("support", provider, region=support_region)
# Call AWSService's __init__
super().__init__("support", provider)
self.account_arn_template = f"arn:{self.audited_partition}:trusted-advisor:{self.region}:{self.audited_account}:account"
self.checks = []
self.premium_support = PremiumSupport(enabled=False)
# Support API is not available in China Partition
# But only in us-east-1 or us-gov-west-1 https://docs.aws.amazon.com/general/latest/gr/awssupport.html
if self.audited_partition != "aws-cn":
if self.audited_partition == "aws":
support_region = "us-east-1"
else:
support_region = "us-gov-west-1"
self.client = self.session.client(self.service, region_name=support_region)
self.client.region = support_region
self._describe_services()
if getattr(self.premium_support, "enabled", False):
self._describe_trusted_advisor_checks()
@@ -34,13 +34,13 @@ class TrustedAdvisor(AWSService):
for check in self.client.describe_trusted_advisor_checks(language="en").get(
"checks", []
):
check_arn = f"arn:{self.audited_partition}:trusted-advisor:{self.region}:{self.audited_account}:check/{check['id']}"
check_arn = f"arn:{self.audited_partition}:trusted-advisor:{self.client.region}:{self.audited_account}:check/{check['id']}"
self.checks.append(
Check(
id=check["id"],
name=check["name"],
arn=check_arn,
region=self.region,
region=self.client.region,
)
)
except ClientError as error:
@@ -50,22 +50,22 @@ class TrustedAdvisor(AWSService):
== "Amazon Web Services Premium Support Subscription is required to use this service."
):
logger.warning(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_trusted_advisor_check_result(self):
logger.info("TrustedAdvisor - Describing Check Result...")
try:
for check in self.checks:
if check.region == self.region:
if check.region == self.client.region:
try:
response = self.client.describe_trusted_advisor_check_result(
checkId=check.id
@@ -78,11 +78,11 @@ class TrustedAdvisor(AWSService):
== "InvalidParameterValueException"
):
logger.warning(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_services(self):
@@ -9,13 +9,15 @@ from prowler.providers.aws.lib.service.service import AWSService
class WAF(AWSService):
def __init__(self, provider):
# AWS WAF is available globally for CloudFront distributions, but you must use the Region US East (N. Virginia) to create your web ACL and any resources used in the web ACL, such as rule groups, IP sets, and regex pattern sets.
region = "us-east-1" if provider.identity.partition == "aws" else None
super().__init__("waf", provider, region=region)
# Call AWSService's __init__
super().__init__("waf", provider)
self.rules = {}
self.rule_groups = {}
self.web_acls = {}
if self.audited_partition == "aws":
# AWS WAF is available globally for CloudFront distributions, but you must use the Region US East (N. Virginia) to create your web ACL and any resources used in the web ACL, such as rule groups, IP sets, and regex pattern sets.
self.region = "us-east-1"
self.client = self.session.client(self.service, self.region)
self._list_rules()
self.__threading_call__(self._get_rule, self.rules.values())
self._list_rule_groups()
@@ -11,11 +11,13 @@ from prowler.providers.aws.lib.service.service import AWSService
class WAFv2(AWSService):
def __init__(self, provider):
# AWS WAFv2 is available globally for CloudFront distributions, but you must use the Region US East (N. Virginia) to create your web ACL.
region = "us-east-1" if provider.identity.partition == "aws" else None
super().__init__(__class__.__name__, provider, region=region)
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.web_acls = {}
if self.audited_partition == "aws":
# AWS WAFv2 is available globally for CloudFront distributions, but you must use the Region US East (N. Virginia) to create your web ACL.
self.region = "us-east-1"
self.client = self.session.client(self.service, self.region)
self._list_web_acls_global()
self.__threading_call__(self._list_web_acls_regional)
self.__threading_call__(self._get_web_acl, self.web_acls.values())
@@ -13,7 +13,7 @@
"Risk": "Allowing `TLS 1.0/1.1` enables protocol downgrades and weak cipher negotiation, exposing HTTPS traffic to **MITM** interception, credential theft, and tampering. This undermines the **confidentiality** and **integrity** of sessions and data in transit, and can enable account takeover via stolen tokens.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/azure/app-service/overview-tls",
"https://learn.microsoft.com/en-us/+azure/app-service/overview-tls",
"https://learn.microsoft.com/en-us/azure/app-service/configure-ssl-bindings#enforce-tls-versions",
"https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/AppService/latest-version-of-tls-encryption-in-use.html",
"https://icompaas.freshdesk.com/support/solutions/articles/62000234773-ensure-that-minimum-tls-version-is-set-to-tls-v1-2-or-higher"
@@ -9,8 +9,8 @@
"Severity": "high",
"ResourceType": "microsoft.network/networkwatchers",
"ResourceGroup": "network",
"Description": "**Azure Network Watcher** has **flow logs** enabled for supported targets, such as **virtual networks** and **network security groups**, and configured with **Traffic Analytics** to forward records to a centralized **Log Analytics workspace**",
"Risk": "Missing, disabled, or non-centralized flow logging blinds visibility into network behavior, hindering detection of:\n- **Lateral movement** and internal scanning\n- **C2 beacons** and exfiltration patterns\nThis degrades incident response and correlation, impacting **confidentiality** and **integrity**.",
"Description": "**Azure Network Watcher** has **NSG flow logs** enabled and configured to forward traffic records to a centralized **Log Analytics workspace**",
"Risk": "Missing or disabled flow logging blinds visibility into network behavior, hindering detection of:\n- **Lateral movement** and internal scanning\n- **C2 beacons** and exfiltration patterns\nThis degrades incident response and correlation, impacting **confidentiality** and **integrity**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/azure/network-watcher/vnet-flow-logs-tutorial",
@@ -18,13 +18,13 @@
],
"Remediation": {
"Code": {
"CLI": "az network watcher flow-log create --location <REGION> --name <FLOW_LOG_NAME> --resource-group <RESOURCE_GROUP> --target-resource-id <TARGET_RESOURCE_ID> --storage-account <STORAGE_ACCOUNT_ID> --enabled true --workspace <LOG_ANALYTICS_WORKSPACE_ID>",
"NativeIaC": "```bicep\n// Enable flow logs for a supported target (for example, a virtual network or NSG)\nresource flowLog 'Microsoft.Network/networkWatchers/flowLogs@2023-09-01' = {\n name: '<example_network_watcher_name>/<example_flow_log_name>'\n location: '<REGION>'\n properties: {\n enabled: true\n targetResourceId: '<example_target_resource_id>'\n storageId: '<example_storage_account_id>'\n flowAnalyticsConfiguration: {\n networkWatcherFlowAnalyticsConfiguration: {\n enabled: true\n workspaceResourceId: '<example_log_analytics_workspace_id>'\n }\n }\n }\n}\n```",
"Other": "1. In Azure portal, go to Network Watcher > Flow logs\n2. Click + Create\n3. Select the subscription and region\n4. Choose the appropriate flow log type and target resource, such as a virtual network or network security group\n5. Set Status to On\n6. Select a Storage account\n7. Enable Traffic analytics and select the Log Analytics workspace\n8. Click Review + create, then Create",
"Terraform": "```hcl\n# Enable flow logs for a supported target and send analytics to Log Analytics\nresource \"azurerm_network_watcher_flow_log\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n network_watcher_name = \"<example_network_watcher_name>\"\n resource_group_name = \"<example_resource_group_name>\"\n target_resource_id = \"<example_target_resource_id>\"\n storage_account_id = \"<example_storage_account_id>\"\n\n enabled = true\n\n traffic_analytics {\n enabled = true\n workspace_id = \"<example_workspace_id>\"\n workspace_region = \"<REGION>\"\n workspace_resource_id = \"<example_log_analytics_workspace_id>\"\n }\n}\n```"
"CLI": "az network watcher flow-log create --location <REGION> --name <FLOW_LOG_NAME> --resource-group <RESOURCE_GROUP> --nsg <NSG_NAME> --storage-account <STORAGE_ACCOUNT_NAME> --enabled true --workspace <LOG_ANALYTICS_WORKSPACE_ID>",
"NativeIaC": "```bicep\n// Enable NSG flow logs and send to Log Analytics\nresource flowLog 'Microsoft.Network/networkWatchers/flowLogs@2022-09-01' = {\n name: '<example_resource_name>/<example_resource_name>'\n location: '<REGION>'\n properties: {\n enabled: true // CRITICAL: turns on flow logs\n targetResourceId: '<example_resource_id>' // NSG resource ID\n storageId: '<example_resource_id>' // required for NSG flow logs\n flowAnalyticsConfiguration: {\n networkWatcherFlowAnalyticsConfiguration: {\n enabled: true // CRITICAL: sends flow logs to Log Analytics\n workspaceResourceId: '<example_resource_id>' // Log Analytics workspace resource ID\n }\n }\n }\n}\n```",
"Other": "1. In Azure portal, go to Network Watcher > Flow logs\n2. Click + Create (or Create flow log)\n3. Select the target NSG and region\n4. Set Status to On\n5. Select a Storage account\n6. Enable Traffic analytics, then select your Log Analytics workspace\n7. Click Review + create, then Create",
"Terraform": "```hcl\n# Enable NSG flow logs and send to Log Analytics\nresource \"azurerm_network_watcher_flow_log\" \"<example_resource_name>\" {\n network_watcher_name = \"<example_resource_name>\"\n resource_group_name = \"<example_resource_name>\"\n network_security_group_id = \"<example_resource_id>\"\n storage_account_id = \"<example_resource_id>\"\n\n enabled = true # CRITICAL: turns on flow logs\n\n traffic_analytics { \n enabled = true # CRITICAL: sends flow logs to Log Analytics\n workspace_id = \"<example_resource_id>\" # workspace_id (GUID) or use data source\n workspace_region = \"<REGION>\"\n workspace_resource_id = \"<example_resource_id>\" # Log Analytics workspace resource ID\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable and centrally aggregate **flow logs** for supported Network Watcher targets, including **virtual networks** and **network security groups**, to a **Log Analytics workspace**.\n\n- Enforce least privilege on log data\n- Define retention and secure storage\n- Use layered monitoring (e.g., Traffic Analytics)\n- Ensure coverage across regions, subscriptions, and critical network segments",
"Text": "Enable and centrally aggregate **NSG flow logs** to a **Log Analytics workspace**.\n\n- Enforce least privilege on log data\n- Define retention and secure storage\n- Use layered monitoring (e.g., Traffic Analytics)\n- Ensure coverage across regions/subscriptions and critical NSGs",
"Url": "https://hub.prowler.com/check/network_flow_log_captured_sent"
}
},
@@ -34,5 +34,5 @@
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Configuring flow logs and Traffic Analytics increases storage and analytics costs. For new Azure deployments, prefer virtual network flow logs where they satisfy your monitoring requirements because NSG flow logs are on the retirement path."
"Notes": "The impact of configuring NSG Flow logs is primarily one of cost and configuration. If deployed, it will create storage accounts that hold minimal amounts of data on a 5-day lifecycle before feeding to Log Analytics Workspace. This will increase the amount of data stored and used by Azure Monitor."
}
@@ -11,26 +11,16 @@ class network_flow_log_captured_sent(Check):
metadata=self.metadata(), resource=network_watcher
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has no flow logs"
if network_watcher.flow_logs:
report.status = "PASS"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has flow logs that are captured and sent to Log Analytics workspace"
has_failed = False
for flow_log in network_watcher.flow_logs:
if not has_failed:
if not flow_log.enabled:
report.status = "FAIL"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has flow logs disabled"
has_failed = True
elif not (
flow_log.traffic_analytics_enabled
and flow_log.workspace_resource_id
):
report.status = "FAIL"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has enabled flow logs that are not configured to send traffic analytics to a Log Analytics workspace"
has_failed = True
else:
report.status = "FAIL"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has no flow logs"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has flow logs disabled"
for flow_log in network_watcher.flow_logs:
if flow_log.enabled:
report.status = "PASS"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has flow logs that are captured and sent to Log Analytics workspace"
break
findings.append(report)
@@ -9,8 +9,8 @@
"Severity": "medium",
"ResourceType": "microsoft.network/networkwatchers",
"ResourceGroup": "network",
"Description": "**Azure Network Watcher** has **flow logs** enabled for supported targets, such as **virtual networks** and **network security groups**, and configured to retain for at least `90` days (or `0` for unlimited). The evaluation checks that flow logging is enabled and that the retention policy meets the required duration for each configured log.",
"Risk": "Absent or short-retained **flow logs** reduce visibility into IP flows, delaying detection of port scans, brute force, data exfiltration, and lateral movement.\n\nForensics and accountability degrade, threatening **confidentiality** and **integrity**.",
"Description": "**Azure Network Watcher** has **NSG flow logs** enabled and configured to retain for at least `90` days (or `0` for unlimited). The evaluation checks that flow logging is enabled and that the retention policy meets the required duration for each configured log.",
"Risk": "Absent or short-retained **NSG flow logs** reduce visibility into IP flows, delaying detection of port scans, brute force, data exfiltration, and lateral movement.\n\nForensics and accountability degrade, threatening **confidentiality** and **integrity**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/cli/azure/network/watcher/flow-log?view=azure-cli-latest",
@@ -20,13 +20,13 @@
],
"Remediation": {
"Code": {
"CLI": "az network watcher flow-log create --location <LOCATION> --name <example_flow_log_name> --target-resource-id <example_target_resource_id> --storage-account <example_storage_account_id> --enabled true --retention 90",
"NativeIaC": "```bicep\n// Enable flow logs with retention >= 90 days for a supported target\nresource flowlog 'Microsoft.Network/networkWatchers/flowLogs@2023-09-01' = {\n name: '<example_network_watcher_name>/<example_flow_log_name>'\n location: '<LOCATION>'\n properties: {\n targetResourceId: '<example_target_resource_id>'\n storageId: '<example_storage_account_id>'\n enabled: true\n retentionPolicy: {\n enabled: true\n days: 90\n }\n }\n}\n```",
"Other": "1. In Azure Portal, go to Network Watcher > Flow logs\n2. Select the relevant flow log or create one for the target resource, such as a virtual network or network security group\n3. Set Status to On\n4. Set Retention (days) to 0 (unlimited) or at least 90\n5. Select a Storage account\n6. Click Save or Review + create",
"Terraform": "```hcl\n# Enable flow logs with retention >= 90 days\nresource \"azurerm_network_watcher_flow_log\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n network_watcher_name = \"<example_network_watcher_name>\"\n resource_group_name = \"<example_resource_group_name>\"\n target_resource_id = \"<example_target_resource_id>\"\n storage_account_id = \"<example_storage_account_id>\"\n\n enabled = true\n\n retention_policy {\n enabled = true\n days = 90\n }\n}\n```"
"CLI": "az network watcher flow-log create --location <LOCATION> --name <example_resource_name> --nsg <example_resource_id> --storage-account <example_resource_id> --retention 90",
"NativeIaC": "```bicep\n// Enable NSG flow logs with retention >= 90 days\nresource flowlog 'Microsoft.Network/networkWatchers/flowLogs@2023-09-01' = {\n name: '<example_resource_name>/<example_resource_name>'\n location: '<LOCATION>'\n properties: {\n targetResourceId: '<example_resource_id>'\n storageId: '<example_resource_id>'\n enabled: true // critical: turns on flow logs\n retentionPolicy: {\n enabled: true // critical: activates retention policy\n days: 90 // critical: 0 (unlimited) or >= 90 to pass\n }\n }\n}\n```",
"Other": "1. In Azure Portal, go to Network Watcher > NSG flow logs\n2. Select the NSG to configure\n3. Set Status to On\n4. Set Retention (days) to 0 (unlimited) or at least 90\n5. Select a Storage account\n6. Click Save",
"Terraform": "```hcl\n# Enable NSG flow logs with retention >= 90 days\nresource \"azurerm_network_watcher_flow_log\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n network_watcher_name = \"<example_resource_name>\"\n resource_group_name = \"<example_resource_name>\"\n target_resource_id = \"<example_resource_id>\"\n storage_account_id = \"<example_resource_id>\"\n\n enabled = true # critical: turns on flow logs\n\n retention_policy {\n enabled = true # critical: activates retention policy\n days = 90 # critical: 0 (unlimited) or >= 90 to pass\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable **flow logs** and keep retention `90` days (`0` for unlimited) for supported targets, including **virtual networks** and **network security groups**. Restrict and monitor access to logs, store immutably, and stream to a SIEM to detect anomalies. Apply **defense in depth** and **least privilege**. Prefer **virtual network flow logs** for new deployments as NSG flow logs are being retired.",
"Text": "Enable **NSG flow logs** and keep retention `90` days (`0` for unlimited). Restrict and monitor access to logs, store immutably, and stream to a SIEM to detect anomalies. Apply **defense in depth** and **least privilege**. Plan migration to **Virtual network flow logs** as NSG flow logs are being retired.",
"Url": "https://hub.prowler.com/check/network_flow_log_more_than_90_days"
}
},
@@ -36,5 +36,5 @@
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Longer retention improves investigation depth but increases storage cost. For new Azure deployments, prefer virtual network flow logs where they satisfy your monitoring requirements because NSG flow logs are on the retirement path."
"Notes": "This will keep IP traffic logs for longer than 90 days. As a level 2, first determine your need to retain data, then apply your selection here. As this is data stored for longer, your monthly storage costs will increase depending on your data use."
}
@@ -79,9 +79,6 @@ class Network(AzureService):
id=flow_log.id,
name=flow_log.name,
enabled=flow_log.enabled,
target_resource_id=getattr(
flow_log, "target_resource_id", None
),
retention_policy=RetentionPolicy(
enabled=(
flow_log.retention_policy.enabled
@@ -94,34 +91,6 @@ class Network(AzureService):
else 0
),
),
traffic_analytics_enabled=bool(
getattr(
getattr(
getattr(
flow_log,
"flow_analytics_configuration",
None,
),
"network_watcher_flow_analytics_configuration",
None,
),
"enabled",
False,
)
),
workspace_resource_id=getattr(
getattr(
getattr(
flow_log,
"flow_analytics_configuration",
None,
),
"network_watcher_flow_analytics_configuration",
None,
),
"workspace_resource_id",
None,
),
)
for flow_log in flow_logs
],
@@ -223,9 +192,6 @@ class FlowLog:
name: str
enabled: bool
retention_policy: RetentionPolicy
target_resource_id: Optional[str] = None
traffic_analytics_enabled: bool = False
workspace_resource_id: Optional[str] = None
@dataclass
+5 -15
View File
@@ -329,21 +329,12 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
@staticmethod
def _strip_scheme(value: str) -> str:
"""Remove a leading http:// or https:// scheme from a registry input."""
for prefix in ("https://", "http://"):
if value.lower().startswith(prefix):
return value[len(prefix) :]
return value
@staticmethod
def _extract_registry(image: str) -> str | None:
"""Extract registry hostname from an image reference.
Returns None for Docker Hub images (no registry prefix).
"""
image = ImageProvider._strip_scheme(image)
parts = image.split("/")
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
return parts[0]
@@ -357,7 +348,6 @@ class ImageProvider(Provider):
or "myregistry.com:5000" are registry URLs (dots in host, no slash).
Image references like "alpine:3.18" or "nginx" are not.
"""
image_uid = ImageProvider._strip_scheme(image_uid)
if "/" not in image_uid:
host_part = image_uid.split(":")[0]
if "." in host_part:
@@ -845,9 +835,11 @@ class ImageProvider(Provider):
image_ref = f"{repo}:{tag}"
else:
# OCI registries need the full host/repo:tag reference
registry_host = ImageProvider._strip_scheme(
self.registry.rstrip("/")
)
registry_host = self.registry.rstrip("/")
for prefix in ("https://", "http://"):
if registry_host.startswith(prefix):
registry_host = registry_host[len(prefix) :]
break
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
@@ -985,8 +977,6 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
image = ImageProvider._strip_scheme(image)
# Registry URL (bare hostname) → test via OCI catalog
if ImageProvider._is_registry_url(image):
return ImageProvider._test_registry_connection(
+10 -140
View File
@@ -2,51 +2,21 @@
from __future__ import annotations
import ipaddress
import re
import socket
import time
from abc import ABC, abstractmethod
from urllib.parse import urlparse
import requests
import tldextract
from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.providers.image.exceptions.exceptions import (
ImageRegistryAuthError,
ImageRegistryNetworkError,
)
from prowler.providers.image.exceptions.exceptions import ImageRegistryNetworkError
_MAX_RETRIES = 3
_BACKOFF_BASE = 1
_USER_AGENT = f"Prowler/{prowler_version} (registry-adapter)"
_NON_PUBLIC_IP_PROPERTIES = (
"is_private",
"is_loopback",
"is_link_local",
"is_multicast",
"is_reserved",
"is_unspecified",
)
def _ip_is_non_public(ip_str: str) -> bool:
try:
addr = ipaddress.ip_address(ip_str)
except ValueError:
return False
return any(getattr(addr, prop) for prop in _NON_PUBLIC_IP_PROPERTIES)
def _registrable_domain(host: str) -> str | None:
ext = tldextract.extract(host)
if not ext.domain or not ext.suffix:
return None
return f"{ext.domain}.{ext.suffix}"
class RegistryAdapter(ABC):
"""Abstract base class for registry adapters."""
@@ -98,107 +68,6 @@ class RegistryAdapter(ABC):
"""Enumerate all tags for a repository."""
...
def _origin_url(self) -> str:
"""The URL whose host the validator compares against when enforce_origin=True.
Subclasses can override if the effective registry origin differs from
``registry_url`` (e.g., Docker Hub talks to ``registry-1.docker.io``).
"""
return self.registry_url
def _validate_outbound_url(
self,
url: str,
*,
enforce_origin: bool = True,
origin_url: str | None = None,
) -> str:
"""Validate a URL before it is passed to ``requests``.
Defenses against parser-mismatch SSRF (PRWLRHELP-2103):
- canonicalise via ``requests.PreparedRequest`` so validator and connector
parse the same string the same way;
- reject schemes other than http/https;
- reject literal non-public IPs (private, loopback, link-local, ...);
- reject hostnames whose A/AAAA records resolve to non-public IPs;
- when ``enforce_origin=True``, reject hosts that don't share the
registry's registrable domain.
Returns the canonical URL the caller should pass to ``requests``.
"""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Disallowed URL scheme: {parsed.scheme!r}. Only http/https are allowed."
),
)
try:
prepared = requests.Request("GET", url).prepare()
except (
requests.exceptions.InvalidURL,
requests.exceptions.MissingSchema,
ValueError,
) as exc:
raise ImageRegistryAuthError(
file=__file__,
message=f"Malformed URL {url!r}: {exc}",
)
canonical_url = prepared.url
canonical = urlparse(canonical_url)
host = canonical.hostname or ""
if not host:
raise ImageRegistryAuthError(
file=__file__,
message=f"URL has no host: {canonical_url}",
)
try:
addr = ipaddress.ip_address(host)
except ValueError:
try:
infos = socket.getaddrinfo(host, None)
except socket.gaierror:
infos = []
for *_, sockaddr in infos:
resolved_ip = sockaddr[0]
if _ip_is_non_public(resolved_ip):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"Host {host!r} resolves to non-public address {resolved_ip}. "
"This may indicate an SSRF attempt."
),
)
else:
if any(getattr(addr, prop) for prop in _NON_PUBLIC_IP_PROPERTIES):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"URL targets a non-public address: {host}. "
"This may indicate an SSRF attempt."
),
)
if enforce_origin:
registry_host = urlparse(origin_url or self._origin_url()).hostname or ""
if registry_host and host != registry_host:
target_d = _registrable_domain(host)
registry_d = _registrable_domain(registry_host)
if not (target_d and registry_d and target_d == registry_d):
raise ImageRegistryAuthError(
file=__file__,
message=(
f"URL host {host!r} is unrelated to registry host "
f"{registry_host!r}; refusing to follow."
),
)
return canonical_url
def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
context_label = kwargs.pop("context_label", None) or self.registry_url
kwargs.setdefault("timeout", 30)
@@ -262,15 +131,16 @@ class RegistryAdapter(ABC):
original_exception=last_exception,
)
def _next_page_url(self, resp: requests.Response) -> str | None:
@staticmethod
def _next_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if not match:
return None
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
url = f"{parsed.scheme}://{parsed.netloc}{url}"
return self._validate_outbound_url(url)
if match:
url = match.group(1)
if url.startswith("/"):
parsed = urlparse(resp.url)
return f"{parsed.scheme}://{parsed.netloc}{url}"
return url
return None
@@ -207,14 +207,15 @@ class DockerHubAdapter(RegistryAdapter):
message=f"Unexpected error during {context} on Docker Hub (HTTP {resp.status_code}): {resp.text[:200]}",
)
def _next_tag_page_url(self, resp: requests.Response) -> str | None:
@staticmethod
def _next_tag_page_url(resp: requests.Response) -> str | None:
link_header = resp.headers.get("Link", "")
if not link_header:
return None
match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if not match:
return None
next_url = match.group(1)
if next_url.startswith("/"):
next_url = f"{_REGISTRY_HOST}{next_url}"
return self._validate_outbound_url(next_url, origin_url=_REGISTRY_HOST)
if match:
next_url = match.group(1)
if next_url.startswith("/"):
return f"{_REGISTRY_HOST}{next_url}"
return next_url
return None
@@ -3,6 +3,7 @@
from __future__ import annotations
import base64
import ipaddress
import re
from typing import TYPE_CHECKING
from urllib.parse import urlparse
@@ -42,9 +43,6 @@ class OciRegistryAdapter(RegistryAdapter):
url = f"https://{url}"
return url
def _origin_url(self) -> str:
return self._base_url
def list_repositories(self) -> list[str]:
self._ensure_auth()
repositories: list[str] = []
@@ -129,9 +127,8 @@ class OciRegistryAdapter(RegistryAdapter):
file=__file__,
message=f"Cannot parse token endpoint from registry {self.registry_url}. Www-Authenticate: {www_authenticate[:200]}",
)
realm = self._validate_outbound_url(match.group(1))
if urlparse(realm).scheme == "http":
logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
realm = match.group(1)
self._validate_realm_url(realm)
params: dict = {}
service_match = re.search(r'service="([^"]+)"', www_authenticate)
if service_match:
@@ -159,6 +156,27 @@ class OciRegistryAdapter(RegistryAdapter):
)
return token
@staticmethod
def _validate_realm_url(realm: str) -> None:
parsed = urlparse(realm)
if parsed.scheme not in ("http", "https"):
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm has disallowed scheme: {parsed.scheme}. Only http/https are allowed.",
)
if parsed.scheme == "http":
logger.warning(f"Bearer token realm uses HTTP (not HTTPS): {realm}")
hostname = parsed.hostname or ""
try:
addr = ipaddress.ip_address(hostname)
if addr.is_private or addr.is_loopback or addr.is_link_local:
raise ImageRegistryAuthError(
file=__file__,
message=f"Bearer token realm points to a private/loopback address: {hostname}. This may indicate an SSRF attempt.",
)
except ValueError:
pass
def _resolve_basic_credentials(self) -> tuple[str | None, str | None]:
"""Decode pre-encoded base64 auth tokens (e.g., from aws ecr get-authorization-token).
@@ -188,24 +206,14 @@ class OciRegistryAdapter(RegistryAdapter):
def _do_authed_request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
if self._is_same_origin_as_registry(url):
if self._bearer_token:
headers["Authorization"] = f"Bearer {self._bearer_token}"
elif self.username and self.password:
user, pwd = self._resolve_basic_credentials()
kwargs.setdefault("auth", (user, pwd))
if self._bearer_token:
headers["Authorization"] = f"Bearer {self._bearer_token}"
elif self.username and self.password:
user, pwd = self._resolve_basic_credentials()
kwargs.setdefault("auth", (user, pwd))
kwargs["headers"] = headers
return self._request_with_retry(method, url, **kwargs)
def _is_same_origin_as_registry(self, url: str) -> bool:
target = urlparse(url)
origin = urlparse(self._base_url)
return (
target.scheme == origin.scheme
and (target.hostname or "") == (origin.hostname or "")
and target.port == origin.port
)
def _check_response(self, resp: requests.Response, context: str) -> None:
if resp.status_code == 200:
return
@@ -66,6 +66,7 @@ class OraclecloudProvider(Provider):
_compartments: list = []
_mutelist: OCIMutelist
audit_metadata: Audit_Metadata
_home_region: str = "us-ashburn-1"
def __init__(
self,
@@ -160,6 +161,13 @@ class OraclecloudProvider(Provider):
# Get regions
self._regions = self.get_regions_to_audit(region)
self._home_region = None
if self._regions:
self._home_region = next(
(region.key for region in self._regions if region.is_home_region),
self._regions[0].key,
)
logger.info(f"Home region is: {self._home_region}")
# Get compartments
self._compartments = self.get_compartments_to_audit(
@@ -217,6 +225,10 @@ class OraclecloudProvider(Provider):
def regions(self):
return self._regions
@property
def home_region(self):
return self._home_region
@property
def compartments(self):
return self._compartments
@@ -1,6 +1,7 @@
"""OCI Identity Service Module."""
from datetime import datetime
from threading import Lock
from typing import Optional
import oci
@@ -26,6 +27,7 @@ class Identity(OCIService):
self.policies = []
self.dynamic_groups = []
self.domains = []
self._domains_lock = Lock()
self.password_policy = None
self.root_compartment_resources = []
self.active_non_root_compartments = []
@@ -61,8 +63,8 @@ class Identity(OCIService):
regional_client: Regional OCI client
"""
try:
# Identity is a global service, use home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global users
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -312,7 +314,8 @@ class Identity(OCIService):
def __list_groups__(self, regional_client):
"""List all IAM groups."""
try:
if regional_client.region not in self.provider.identity.region:
# Only use one region for global groups
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -355,7 +358,8 @@ class Identity(OCIService):
def __list_policies__(self, regional_client):
"""List all IAM policies."""
try:
if regional_client.region not in self.provider.identity.region:
# Only use one region for global policies
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -399,8 +403,8 @@ class Identity(OCIService):
def __list_dynamic_groups__(self, regional_client):
"""List all dynamic groups in the tenancy."""
try:
# Dynamic groups are only in the home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global dynamic groups
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -447,10 +451,6 @@ class Identity(OCIService):
def __list_domains__(self, regional_client):
"""List all identity domains."""
try:
# Domains are only in the home region
if regional_client.region not in self.provider.identity.region:
return
identity_client = self.__get_client__(regional_client.region)
logger.info("Identity - Listing Identity Domains...")
@@ -458,6 +458,7 @@ class Identity(OCIService):
try:
# List all domains in the tenancy
for compartment in self.audited_compartments:
domains = oci.pagination.list_call_get_all_results(
identity_client.list_domains,
compartment_id=compartment.id,
@@ -465,20 +466,38 @@ class Identity(OCIService):
).data
for domain in domains:
self.domains.append(
IdentityDomain(
id=domain.id,
display_name=domain.display_name,
description=domain.description or "",
url=domain.url,
home_region=domain.home_region,
compartment_id=compartment.id,
lifecycle_state=domain.lifecycle_state,
time_created=domain.time_created,
region=regional_client.region,
password_policies=[],
# Threads run __list_domains__ concurrently per
# region; serialize the dedupe-then-append so two
# regions returning the same domain cannot race
# past each other and produce duplicates or lose
# the home-region preference.
with self._domains_lock:
existing = next(
(d for d in self.domains if d.id == domain.id),
None,
)
if existing is not None:
# Prefer the entry from the domain's home region
if domain.home_region == regional_client.region:
self.domains.remove(existing)
else:
continue
self.domains.append(
IdentityDomain(
id=domain.id,
display_name=domain.display_name,
description=domain.description or "",
url=domain.url,
home_region=domain.home_region,
compartment_id=compartment.id,
lifecycle_state=domain.lifecycle_state,
time_created=domain.time_created,
region=regional_client.region,
password_policies=[],
)
)
)
except Exception as error:
logger.error(
@@ -493,8 +512,8 @@ class Identity(OCIService):
def __list_domain_password_policies__(self, regional_client):
"""List password policies for all identity domains."""
try:
# Password policies are only in the home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for all domain scan
if regional_client.region != self.provider.home_region:
return
logger.info("Identity - Listing Domain Password Policies...")
@@ -551,7 +570,8 @@ class Identity(OCIService):
def __get_password_policy__(self, regional_client):
"""Get the password policy for the tenancy."""
try:
if regional_client.region not in self.provider.identity.region:
# Only use one region for global password policies
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -578,8 +598,8 @@ class Identity(OCIService):
def __search_root_compartment_resources__(self, regional_client):
"""Search for resources in the root compartment using OCI Resource Search."""
try:
# Search is a global service, use home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global search
if regional_client.region != self.provider.home_region:
return
logger.info("Identity - Searching for resources in root compartment...")
@@ -626,10 +646,9 @@ class Identity(OCIService):
def __search_active_non_root_compartments__(self, regional_client):
"""Search for active non-root compartments using OCI Resource Search."""
try:
# Search is a global service, use home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global search
if regional_client.region != self.provider.home_region:
return
logger.info("Identity - Searching for active non-root compartments...")
# Create search client using the helper method for proper authentication
-27
View File
@@ -1,27 +0,0 @@
from typing import Optional
def extract_billing_plan(data: Optional[dict]) -> Optional[str]:
"""Return the Vercel billing plan from a user or team payload.
Vercel's REST API consistently returns the plan identifier at
``data["billing"]["plan"]`` (e.g. ``"hobby"``, ``"pro"``, ``"enterprise"``)
on both ``GET /v2/user`` and ``GET /v2/teams`` responses, even though the
field is not part of the public OpenAPI schema.
"""
if not isinstance(data, dict):
return None
billing = data.get("billing")
if not isinstance(billing, dict):
return None
plan = billing.get("plan")
return plan.lower() if isinstance(plan, str) else None
def plan_reason_suffix(
billing_plan: Optional[str], unsupported_plans: set[str], explanation: str
) -> str:
"""Return a plan-based explanation suffix only when the plan proves it."""
if billing_plan in unsupported_plans:
return f" This may be expected because {explanation}"
return ""
@@ -84,10 +84,10 @@ class VercelService:
)
if response.status_code == 403:
# Endpoint unavailable for this token/scope; let checks handle it gracefully
logger.info(
# Plan limitation or permission error — return None for graceful handling
logger.warning(
f"{self.service} - Access denied for {path} (403). "
"This may be caused by plan or permission restrictions."
"This may be a plan limitation."
)
return None
-19
View File
@@ -21,7 +21,6 @@ class VercelTeamInfo(BaseModel):
id: str
name: str
slug: str
billing_plan: Optional[str] = None
class VercelIdentityInfo(BaseModel):
@@ -30,27 +29,9 @@ class VercelIdentityInfo(BaseModel):
user_id: Optional[str] = None
username: Optional[str] = None
email: Optional[str] = None
billing_plan: Optional[str] = None
team: Optional[VercelTeamInfo] = None
teams: list[VercelTeamInfo] = Field(default_factory=list)
def get_billing_plan_for(self, scope_id: Optional[str]) -> Optional[str]:
"""Return the billing plan for an explicit user or team scope."""
if not scope_id:
return None
if self.team and self.team.id == scope_id and self.team.billing_plan:
return self.team.billing_plan
for team in self.teams:
if team.id == scope_id:
return team.billing_plan
if self.user_id == scope_id:
return self.billing_plan
return None
class VercelOutputOptions(ProviderOutputOptions):
"""Customize output filenames for Vercel scans."""
@@ -28,8 +28,7 @@
}
},
"Categories": [
"trust-boundaries",
"vercel-hobby-plan"
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"trust-boundaries",
"vercel-hobby-plan"
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"trust-boundaries",
"vercel-hobby-plan"
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,8 +28,7 @@
}
},
"Categories": [
"trust-boundaries",
"vercel-hobby-plan"
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"encryption",
"vercel-hobby-plan"
"encryption"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"trust-boundaries",
"vercel-hobby-plan"
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"trust-boundaries",
"vercel-hobby-plan"
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,8 +28,7 @@
}
},
"Categories": [
"internet-exposed",
"vercel-hobby-plan"
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"internet-exposed",
"vercel-hobby-plan"
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,8 +28,7 @@
}
},
"Categories": [
"secrets",
"vercel-hobby-plan"
"secrets"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"secrets",
"vercel-hobby-plan"
"secrets"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"secrets",
"vercel-hobby-plan"
"secrets"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,8 +28,7 @@
}
},
"Categories": [
"internet-exposed",
"vercel-hobby-plan"
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,12 +28,11 @@
}
},
"Categories": [
"internet-exposed",
"vercel-pro-plan"
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"project_deployment_protection_enabled"
],
"Notes": "Required billing plan: Enterprise, or as a paid add-on for Pro plans."
"Notes": ""
}

Some files were not shown because too many files have changed in this diff Show More