Compare commits

...

46 Commits

Author SHA1 Message Date
StylusFrost cf99e02ceb Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-05-05 08:57:11 +02:00
Pepe Fagoaga 703a33108c chore(changelog): prepare for v5.25.2 (#10991) 2026-05-05 08:47:28 +02:00
Pepe Fagoaga 7c6d658154 fix(k8s): match RBAC rules by apiGroup, not just core (#10969)
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 19:54:03 +02:00
Pepe Fagoaga 21d7d08b4b fix(timeline): Return a compact actor name from CloudTrail events (#10986) 2026-05-04 19:39:17 +02:00
Pepe Fagoaga f314725f4d fix(k8s): deduplicate RBAC findings by unique subject (#10242)
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 18:11:38 +02:00
Rubén De la Torre Vico 02f43a7ad6 docs: add Prowler Studio page and remove check-kreator pages (#10981) 2026-05-04 17:51:02 +02:00
Daniel Barranquero 0dd8981ee4 feat: add issue template for creating new checks (#10976) 2026-05-04 17:47:39 +02:00
Rubén De la Torre Vico 269e51259d docs: add troubleshooting guide for stuck scans after worker crash (#10938) 2026-05-04 17:24:09 +02:00
Hugo Pereira Brito f4afdf0541 chore(ui): decrement changelog entry version to 1.25.2 (#10974) 2026-05-04 14:59:27 +01:00
Hugo Pereira Brito 652cb69216 fix(ui): compliance card layout polish (#10939) 2026-05-04 12:59:06 +01:00
Daniel Barranquero 921f49a0de feat(aws): add bedrock_prompt_management_exists security check (#10878) 2026-05-04 12:38:15 +02:00
Hugo Pereira Brito 6cb770fcc8 fix(ui): clean up findings expanded resource row layout (#10949) 2026-05-04 11:17:54 +01:00
Daniel Barranquero 86449fb99d chore(vercel): add disclaimer for checks depending on billing plan (#10663) 2026-05-04 08:56:50 +02:00
Andoni Alonso 40dd0e640b fix(sdk): strip http(s):// scheme from image registry URLs (#10950) 2026-05-04 08:37:46 +02:00
StylusFrost 9681901174 style(sdk): satisfy black and vulture in test_dynamic_provider_loading
Two unrelated lint blockers that CI surfaced once the file was in scope
of `black --check .` and `vulture` on the changed-file path:

- Reformat one if/and conditional that black wanted on a single line.
- Underscore-prefix unused parameter names on FakeProvider stub methods
  (from_cli_args, get_output_options, display_compliance_table,
  generate_compliance_output, FakePureContractProvider.from_cli_args)
  and the throwaway **kw in a MagicMock side_effect lambda. These are
  test fixtures whose method bodies don't reference those args; the
  signatures must match the abstract contract on Provider, so we keep
  positions/types and just rename to indicate intentional non-use.

No logic change.
2026-05-03 22:55:24 +02:00
StylusFrost bbe3a7dbf8 refactor(sdk): extract is_builtin_provider to leaf module to break import cycle
CodeQL flagged a cyclic import after `prowler/lib/check/utils.py` and
`prowler/lib/check/check.py` started importing `Provider` from
`prowler.providers.common.provider`. That module transitively imports
`prowler.config.config`, which imports back into `prowler.lib.check.*`
(`compliance_models`, `external_tool_providers`) — closing the cycle.

Apply the same pattern already used for `is_tool_wrapper_provider`:
extract the predicate to a leaf module, `prowler.providers.common.builtin`,
that depends only on `importlib.util`. `Provider.is_builtin` delegates to
the leaf, and call sites in `prowler.lib.check.*` now import directly
from the leaf — no more cycle.

Also underscore-prefix unused parameters on the abstract stubs in
`Provider` (get_finding_output_data, generate_compliance_output,
display_compliance_table) so vulture stops flagging them now that the
file is in the diff.
2026-05-03 22:46:00 +02:00
StylusFrost 0672c80563 fix(sdk): guard find_spec with is_builtin for external provider discovery
Calling importlib.util.find_spec on prowler.providers.{provider}.services
for an external provider propagates ModuleNotFoundError when the parent
package prowler.providers.{provider} does not exist, instead of returning
None. This caused recover_checks_from_provider, _resolve_check_module and
Scan.scan to fail with "No module named 'prowler.providers.{external}'"
even though the plug-in registered its checks via entry points correctly.

Gate the built-in branch on Provider.is_builtin (which already wraps the
find_spec in try/except) and reuse _resolve_check_module from Scan.scan
so external providers fall through to the entry-point lookup.
2026-05-03 22:31:31 +02:00
StylusFrost 92d7ea2170 Merge remote-tracking branch 'origin/master' into PROWLER-1391-provider-contract-dynamic-discovery
# Conflicts:
#	prowler/config/config.py
2026-05-03 19:41:25 +02:00
StylusFrost c7aa536896 fix(sdk): built-in wins on plug-in collision for providers and checks 2026-04-30 19:50:19 +02:00
StylusFrost e7f23bb13f fix(sdk): propagate provider argument from report to stdout_report 2026-04-30 14:06:17 +02:00
StylusFrost 82132a9341 fix(sdk): use find_spec to distinguish missing vs broken built-ins 2026-04-30 13:53:06 +02:00
StylusFrost 5e876579f8 fix(sdk): use is_tool_wrapper_provider for compliance framework gate 2026-04-30 10:12:24 +02:00
StylusFrost be49fd8c4e Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-28 14:48:21 +02:00
StylusFrost 15d8f1642e test(sdk): unit tests for tool_wrapper leaf module 2026-04-28 14:45:07 +02:00
StylusFrost 79f12f3617 refactor(sdk): extract is_tool_wrapper_provider to leaf module to break import cycle 2026-04-28 13:57:27 +02:00
StylusFrost 6715361246 fix(sdk): restore dynamic external providers help in CLI epilog 2026-04-28 12:47:50 +02:00
StylusFrost 45e946cd87 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-28 12:42:18 +02:00
StylusFrost 7836905b82 fix(sdk): consult Provider.is_tool_wrapper_provider in check discovery 2026-04-28 12:20:42 +02:00
StylusFrost 52f6653ccf fix(sdk): use equality not substring in provider dispatch chain 2026-04-28 11:54:23 +02:00
StylusFrost a5de6608ae fix(sdk): restore llm in parser usage line to match epilog 2026-04-28 10:50:43 +02:00
StylusFrost 1cdce02397 fix(sdk): use startswith("-") to detect CLI flags so external provider names with hyphens are not misparsed 2026-04-24 20:59:07 +02:00
StylusFrost a31fe9b618 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery
Conflict in prowler/config/config.py resolved by combining both branches:
- HEAD: external compliance discovery via entry points (PROWLER-1391)
- master: multi-provider framework JSONs scanned at top-level compliance/ (#10300)

Order: built-in per-provider -> built-in multi-provider -> external entry points.
Built-ins first so they win on name collisions against external registrations.

Supporting external plug-ins to register multi-provider frameworks is tracked
in PROWLER-1444.
2026-04-24 20:54:22 +02:00
StylusFrost 907166d88a fix(sdk): discriminate builtin vs external providers via find_spec for clearer import errors 2026-04-24 20:33:38 +02:00
StylusFrost 0883baad78 fix(sdk): external providers with --service and external checks for new services 2026-04-24 20:18:20 +02:00
StylusFrost cf70d1f9f8 fix(sdk): honor from_cli_args return value in init_global_provider fallback 2026-04-24 18:51:57 +02:00
StylusFrost 60e7657081 feat(sdk): wire is_external_tool_provider property to execution and metadata validators 2026-04-24 18:23:42 +02:00
StylusFrost e8487d0686 fix(sdk): unwrap namespaced config for all built-in and external providers
load_and_validate_config_file only detected the namespaced format for 5
hardcoded providers (aws, gcp, azure, kubernetes, m365). For every other
built-in (github, nhn, vercel, cloudflare, iac, llm, image, mongodbatlas,
oraclecloud, openstack, alibabacloud, googleworkspace) and for any
external plug-in, the full YAML was returned wrapped instead of the
provider's own block.

Replace the hardcoded list with a dynamic check: if the file has a
top-level key matching the provider and its value is a dict, unwrap it.
Keep the legacy flat format for AWS only (historical, pre-multicloud)
and identify it by the absence of nested-dict top-level values, which
prevents cross-provider config leakage when a namespaced file has no
section for the requested provider.
2026-04-24 18:01:47 +02:00
StylusFrost 9c056beed1 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-22 09:39:10 +02:00
StylusFrost f60f7c61c7 feat(provider): add display_compliance_table method for provider-specific compliance rendering 2026-04-21 19:40:38 +02:00
StylusFrost 3deb1359a5 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-21 18:52:13 +02:00
StylusFrost e2295bd086 feat(provider): implement get_mutelist_finding_args for external providers and add tests 2026-04-21 18:50:18 +02:00
StylusFrost e27317437d feat(external-provider): add dynamic loading tests and coverage for external provider 2026-04-21 14:37:50 +02:00
StylusFrost 6f6016d822 chore: update CHANGELOG for Prowler v5.25.0 with new features 2026-04-21 14:22:24 +02:00
StylusFrost 5f10e1c1b6 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-21 14:19:52 +02:00
StylusFrost 484211b465 fix(sdk): align exception handlers to SDK convention and improve test coverage 2026-04-21 14:14:17 +02:00
StylusFrost f8333baf24 feat: Enhance dynamic provider loading and compliance framework discovery
- Implemented dynamic loading of external providers via entry points, allowing for greater flexibility in provider integration.
  - Added functionality to discover compliance directories from entry points, enabling external compliance frameworks to be loaded seamlessly.
  - Refactored check module resolution to prioritize built-in checks while falling back to entry points if necessary.
  - Improved compliance framework loading to include both built-in and external sources, ensuring comprehensive compliance coverage.
  - Enhanced CLI argument parsing to support external providers, improving user experience and configurability.
  - Introduced extensive unit tests to validate dynamic loading, compliance discovery, and overall integration of external providers.
2026-04-15 13:22:57 +02:00
121 changed files with 5238 additions and 643 deletions
@@ -0,0 +1,143 @@
name: "🔎 New Check Request"
description: Request a new Prowler security check
title: "[New Check]: "
labels: ["feature-request", "status/needs-triage"]
body:
- type: checkboxes
id: search
attributes:
label: Existing check search
description: Confirm this check does not already exist before opening a new request.
options:
- label: I have searched existing issues, Prowler Hub, and the public roadmap, and this check does not already exist.
required: true
- type: markdown
attributes:
value: |
Use this form to describe the security condition that Prowler should evaluate.
The most useful inputs for [Prowler Studio](https://github.com/prowler-cloud/prowler-studio) are:
- What should be detected
- What PASS and FAIL mean
- Vendor docs, API references, SDK methods, CLI commands, or reference code
- type: dropdown
id: provider
attributes:
label: Provider
description: Cloud or platform this check targets.
options:
- AWS
- Azure
- GCP
- Kubernetes
- GitHub
- Microsoft 365
- OCI
- Alibaba Cloud
- Cloudflare
- MongoDB Atlas
- Google Workspace
- OpenStack
- Vercel
- NHN
- Other / New provider
validations:
required: true
- type: input
id: other_provider_name
attributes:
label: New provider name
description: Only fill this if you selected "Other / New provider" above.
placeholder: "NewProviderName"
validations:
required: false
- type: input
id: service_name
attributes:
label: Service or product area
description: Optional. Main service, product, or feature to audit.
placeholder: "s3, bedrock, entra, repository, apiserver"
validations:
required: false
- type: input
id: suggested_check_name
attributes:
label: Suggested check name
description: Optional. Use `snake_case` following `<service>_<resource>_<best_practice>`, with lowercase letters and underscores only.
placeholder: "bedrock_guardrail_sensitive_information_filter_enabled"
validations:
required: false
- type: textarea
id: context
attributes:
label: Context and goal
description: Describe the security problem, why it matters, and what this new check should help detect.
placeholder: |-
- Security condition to validate:
- Why it matters:
- Resource, feature, or configuration involved:
validations:
required: true
- type: textarea
id: expected_behavior
attributes:
label: Expected behavior
description: Explain what the check should evaluate and what PASS, FAIL, or MANUAL should mean.
placeholder: |-
- Resource or scope to evaluate:
- PASS when:
- FAIL when:
- MANUAL when (if applicable):
- Exclusions, thresholds, or edge cases:
validations:
required: true
- type: textarea
id: references
attributes:
label: References
description: Add vendor docs, API references, SDK methods, CLI commands, endpoint docs, sample payloads, or similar reference material.
placeholder: |-
- Product or service documentation:
- API or SDK reference:
- CLI command or endpoint documentation:
- Sample payload or response:
- Security advisory or benchmark:
validations:
required: true
- type: dropdown
id: severity
attributes:
label: Suggested severity
description: Your best estimate. Reviewers will confirm during triage.
options:
- Critical
- High
- Medium
- Low
- Informational
- Not sure
validations:
required: true
- type: textarea
id: implementation_notes
attributes:
label: Additional implementation notes
description: Optional. Add permissions, unsupported regions, config knobs, product limitations, or anything else that may affect implementation.
placeholder: |-
- Required permissions or scopes:
- Region, tenant, or subscription limitations:
- Configurable behavior or thresholds:
- Other constraints:
validations:
required: false
+26
View File
@@ -516,6 +516,32 @@ jobs:
flags: prowler-py${{ matrix.python-version }}-vercel
files: ./vercel_coverage.xml
# External Provider (dynamic loading)
- name: Check if External Provider files changed
if: steps.check-changes.outputs.any_changed == 'true'
id: changed-external
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
./prowler/providers/common/**
./prowler/config/**
./prowler/lib/**
./tests/providers/external/**
./poetry.lock
- name: Run External Provider tests
if: steps.changed-external.outputs.any_changed == 'true'
run: poetry run pytest -n auto --cov=./prowler/providers/common --cov=./prowler/config --cov=./prowler/lib --cov-report=xml:external_coverage.xml tests/providers/external
- name: Upload External Provider coverage to Codecov
if: steps.changed-external.outputs.any_changed == 'true'
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
flags: prowler-py${{ matrix.python-version }}-external
files: ./external_coverage.xml
# Lib
- name: Check if Lib files changed
if: steps.check-changes.outputs.any_changed == 'true'
+1 -1
View File
@@ -6,7 +6,7 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- New `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
- `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
---
@@ -215,3 +215,6 @@ Also is important to keep all code examples as short as possible, including the
| e5 | M365 and Azure Entra checks enabled by or dependent on an E5 license (e.g., advanced threat protection, audit, DLP, and eDiscovery) |
| privilege-escalation | Detects IAM policies or permissions that allow identities to elevate their privileges beyond their intended scope, potentially gaining administrator or higher-level access through specific action combinations |
| ec2-imdsv1 | Identifies EC2 instances using Instance Metadata Service version 1 (IMDSv1), which is vulnerable to SSRF attacks and should be replaced with IMDSv2 for enhanced security |
| vercel-hobby-plan | Vercel checks whose audited feature is available on the Hobby plan (and therefore also on Pro and Enterprise plans) |
| vercel-pro-plan | Vercel checks whose audited feature requires a Pro plan or higher, including features also available on Enterprise or via supported paid add-ons for Pro plans |
| vercel-enterprise-plan | Vercel checks whose audited feature requires the Enterprise plan |
+20 -6
View File
@@ -27,14 +27,28 @@ The most common high level steps to create a new check are:
### Naming Format for Checks
Checks must be named following the format: `service_subservice_resource_action`.
If you already know the check name when creating a request or implementing a check, use a descriptive identifier with lowercase letters and underscores only.
Recommended patterns:
- `<service>_<resource>_<best_practice>`
The name components are:
- `service` The main service being audited (e.g., ec2, entra, iam, etc.)
- `subservice` An individual component or subset of functionality within the service that is being audited. This may correspond to a shortened version of the class attribute accessed within the check. If there is no subservice, just omit.
- `resource` The specific resource type being evaluated (e.g., instance, policy, role, etc.)
- `action` The security aspect or configuration being checked (e.g., public, encrypted, enabled, etc.)
- `service` The main service or product area being audited (e.g., ec2, entra, iam, bedrock).
- `resource` The resource, feature, or configuration being evaluated. It can be a single word or a compound phrase joined with underscores (e.g., instance, policy, guardrail, sensitive_information_filter).
- `best_practice` The expected secure state or best practice being checked (e.g., enabled, encrypted, restricted, configured, not_publicly_accessible).
Additional guidance:
- Use underscores only. Do not use hyphens.
- Keep the name specific enough to describe the behavior of the check.
- The first segment should match the service or product area whenever possible.
Examples:
- `s3_bucket_versioning_enabled`
- `bedrock_guardrail_sensitive_information_filter_enabled`
### File Creation
@@ -387,7 +401,7 @@ Provides both code examples and best practice recommendations for addressing the
#### Categories
One or more functional groupings used for execution filtering (e.g., `internet-exposed`). You can define new categories just by adding to this field.
One or more functional groupings used for execution filtering (e.g., `internet-exposed`). Categories must match the predefined values enforced by `CheckMetadata`; adding a new category requires updating the validator and the metadata documentation.
For the complete list of available categories, see [Categories Guidelines](/developer-guide/check-metadata-guidelines#categories-guidelines).
+131
View File
@@ -0,0 +1,131 @@
---
title: 'Prowler Studio'
---
**Prowler Studio is an AI workflow that ensures Claude Code follows Prowler's skills, guardrails, and best practices when creating new security checks.** What lands in the resulting pull request is consistent, tested, and ready for human review — not half-correct boilerplate that needs to be rewritten.
<Info>
**Contributor Tool**: Prowler Studio is a workflow for advanced contributors adding new Prowler security checks. It is not part of Prowler Cloud, Prowler App, or Prowler CLI.
</Info>
<Warning>
**Preview Feature**: Prowler Studio is under active development and breaking changes are expected. Please report issues or share feedback on [GitHub](https://github.com/prowler-cloud/prowler-studio/issues) or in the [Slack community](https://goto.prowler.com/slack).
</Warning>
<Card title="Prowler Studio Repository" icon="github" href="https://github.com/prowler-cloud/prowler-studio" horizontal>
Clone the source code, install Prowler Studio, and explore the agent workflow in detail.
</Card>
## The Problem
Adding a new check to [Prowler](https://github.com/prowler-cloud/prowler) is more than writing detection logic. A correct check has to:
- Match Prowler's exact service and check folder structure and naming conventions
- Wire up metadata, severity, remediation, tests, and compliance mappings
- Mirror the patterns used by the hundreds of existing checks in the same provider
- Actually load when Prowler scans for available checks — silent structural mistakes are easy to make
Asking a general-purpose AI assistant to do this usually means guessing. It misses conventions, skips tests, or invents structure that looks right but does not load. The result is a half-correct PR that needs to be reviewed line by line or rewritten.
## The Solution
Prowler Studio enforces the workflow end-to-end. Describe the check once — a markdown ticket, a Jira issue, or a GitHub issue — and the workflow:
1. **Loads Prowler-specific skills into every agent.** Every step starts with the same context an experienced Prowler engineer would have in mind. See [AI Skills System](/developer-guide/ai-skills) for how skills are structured.
2. **Runs specialized agents in sequence.** Implementation → testing → compliance mapping → review → PR creation. Each agent has one job and a tight scope.
3. **Verifies as it goes.** The check must load in Prowler. Tests must pass. If something fails, the agent fixes it and re-runs (up to a bounded number of attempts) before moving on.
4. **Produces a complete pull request.** Branch, passing check, tests, compliance mappings, and a pull request waiting for human review.
The result is a consistent starting point, every time, on every supported provider.
## Quick Start
### Install
Prowler Studio requires [`uv`](https://docs.astral.sh/uv/getting-started/installation/) — see the official [installation guide](https://docs.astral.sh/uv/getting-started/installation/).
```bash
git clone https://github.com/prowler-cloud/prowler-studio
cd prowler-studio
uv sync
source .venv/bin/activate
```
### Describe the Check
A ticket is a structured markdown description of the check to create. It is the only input the workflow needs; every agent (implementation, testing, compliance mapping, review, PR creation) uses it as the source of truth, so the more concrete it is, the closer the first PR will land to the desired outcome.
The ticket can be supplied in three ways:
- **Local markdown file** → `--ticket path/to/ticket.md`
- **Jira issue** → `--jira-url https://...` (uses the issue body)
- **GitHub issue** → `--github-url https://...` (uses the issue body)
The content should follow the **New Check Request** template:
- The local copy at [`check_ticket_template.md`](https://github.com/prowler-cloud/prowler-studio/blob/main/check_ticket_template.md) covers `--ticket` and Jira tickets.
- A prefilled GitHub form is also available: [Create a New Check Request issue](https://github.com/prowler-cloud/prowler/issues/new?template=new-check-request.yml).
Sections marked *Optional* can be skipped; everything else helps the agents make the right decisions.
### Run the Workflow
From a local markdown ticket:
```bash
prowler-studio --ticket check_ticket.md
```
From a Jira ticket:
```bash
prowler-studio --jira-url https://mycompany.atlassian.net/browse/PROJ-123
```
From a GitHub issue:
```bash
prowler-studio --github-url https://github.com/owner/repo/issues/123
```
<Note>
Provide exactly one of `--ticket`, `--jira-url`, or `--github-url`.
</Note>
Keep changes local (no push, no pull request):
```bash
prowler-studio -b feat/my-check --ticket check_ticket.md --local
```
### What You Get
After a successful run the working environment contains:
- A new branch on a clean Prowler worktree containing the check, metadata, tests, and compliance mappings
- A pull request opened against Prowler (skipped with `--local`)
- A timestamped log file under `logs/` capturing every step the agents took
## CLI Options
| Option | Short | Description |
|--------|-------|-------------|
| `--branch` | `-b` | Branch name (default: `feat/<ticket>-<check_name>` or `feat/<check_name>`) |
| `--ticket` | `-t` | Path to a markdown check ticket file |
| `--jira-url` | `-j` | Jira ticket URL (e.g., `https://mycompany.atlassian.net/browse/PROJ-123`) |
| `--github-url` | `-g` | GitHub issue URL (e.g., `https://github.com/owner/repo/issues/123`) |
| `--working-dir` | `-w` | Working directory for the Prowler clone (default: `./working`) |
| `--no-worktree` | | Legacy mode — work directly on the main clone instead of using worktrees |
| `--cleanup-worktree` | | Remove the worktree after a successful pull request is created |
| `--local` | | Keep changes local — skip push and pull request creation |
## Configuration
Set these environment variables depending on the input source:
| Variable | When Needed | Purpose |
|----------|-------------|---------|
| `GITHUB_TOKEN` | `--github-url` (recommended) | Higher GitHub API rate limits and access to private issues |
| `JIRA_SITE_URL` | `--jira-url` | Jira site, e.g. `https://mycompany.atlassian.net` |
| `JIRA_EMAIL` | `--jira-url` | Email of the Jira account used to fetch the ticket |
| `JIRA_API_TOKEN` | `--jira-url` | API token for the Jira account |
+2 -1
View File
@@ -365,7 +365,8 @@
"developer-guide/security-compliance-framework",
"developer-guide/lighthouse-architecture",
"developer-guide/mcp-server",
"developer-guide/ai-skills"
"developer-guide/ai-skills",
"developer-guide/prowler-studio"
]
},
{
+34
View File
@@ -159,6 +159,40 @@ When these environment variables are set, the API will use them directly instead
A fix addressing this permission issue is being evaluated in [PR #9953](https://github.com/prowler-cloud/prowler/pull/9953).
</Note>
### Scan Stuck in Executing State After Worker Crash
When running Prowler App via Docker Compose, a scan may remain indefinitely in the `executing` state if the worker process crashes (for example, due to an Out of Memory condition) before it can update the scan status. Since it is not currently possible to cancel a scan in `executing` state through the UI, the workaround is to manually update the scan record in the database.
**Root Cause:**
The Celery worker process terminates unexpectedly (OOM, node failure, etc.) before transitioning the scan state to `completed` or `failed`. The scan record remains in `executing` with no active process to advance it.
**Solution:**
Connect to the database using the `prowler_admin` user. Due to Row-Level Security (RLS), the default database user cannot see scan records — you must use `prowler_admin`:
```bash
psql -U prowler_admin -d prowler_db
```
Identify the stuck scan by filtering for scans in `executing` state:
```sql
SELECT id, name, state, started_at FROM scans WHERE state = 'executing';
```
Update the scan state to `failed` using the scan ID:
```sql
UPDATE scans SET state = 'failed' WHERE id = '<scan-id>';
```
After this change, the scan will appear as failed in the UI and you can launch a new scan.
<Note>
A feature to cancel executing scans directly from the UI is being tracked in [GitHub Issue #6893](https://github.com/prowler-cloud/prowler/issues/6893).
</Note>
### SAML/OAuth ACS URL Incorrect When Running Behind a Proxy or Load Balancer
See [GitHub Issue #9724](https://github.com/prowler-cloud/prowler/issues/9724) for more details.
@@ -1,47 +0,0 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
@@ -160,3 +160,25 @@ Prowler for Vercel includes security checks across the following services:
| **Project** | Deployment protection, environment variable security, fork protection, and skew protection |
| **Security** | Web Application Firewall (WAF), rate limiting, IP blocking, and managed rulesets |
| **Team** | SSO enforcement, directory sync, member access, and invitation hygiene |
## Checks With Explicit Plan-Based Behavior
Prowler currently includes 26 Vercel checks. The 11 checks below have explicit billing-plan handling in the provider metadata or check logic. When the scanned scope reports a billing plan, Prowler adds plan-aware context to findings for these checks. If the API does not expose the required configuration, Prowler may return `MANUAL` and require verification in the Vercel dashboard.
| Check ID | Hobby | Pro | Enterprise | Notes |
|----------|-------|-----|------------|-------|
| `project_password_protection_enabled` | Not available | Available as a paid add-on | Available | Checks password protection for deployments |
| `project_production_deployment_protection_enabled` | Not available | Available with supported paid deployment protection options | Available | Checks protection for production deployments |
| `project_skew_protection_enabled` | Not available | Available | Available | Checks skew protection during rollouts |
| `security_custom_rules_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_ip_blocking_rules_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `team_saml_sso_enabled` | Not available | Available | Available | Checks team SAML SSO configuration |
| `team_saml_sso_enforced` | Not available | Available | Available | Checks SAML SSO enforcement for all team members |
| `team_directory_sync_enabled` | Not available | Not available | Available | Checks SCIM directory sync |
| `security_managed_rulesets_enabled` | Bot Protection and AI Bots managed rulesets | Bot Protection and AI Bots managed rulesets | All managed rulesets, including OWASP Core Ruleset | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_rate_limiting_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_waf_enabled` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
<Note>
The five firewall-related checks (`security_waf_enabled`, `security_custom_rules_configured`, `security_ip_blocking_rules_configured`, `security_rate_limiting_configured`, and `security_managed_rulesets_enabled`) return `MANUAL` when the firewall configuration endpoint is not accessible from the API. The other 15 current Vercel checks do not currently include plan-specific handling in provider logic, but every Vercel check includes exactly one billing-plan metadata category (`vercel-hobby-plan`, `vercel-pro-plan`, or `vercel-enterprise-plan`) alongside its functional security category.
</Note>
@@ -1,51 +0,0 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
+20 -2
View File
@@ -6,15 +6,17 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🚀 Added
- Support for external/custom providers, checks, and compliance frameworks without modifying core code [(#10700)](https://github.com/prowler-cloud/prowler/pull/10700)
- `bedrock_guardrails_configured` check for AWS provider [(#10844)](https://github.com/prowler-cloud/prowler/pull/10844)
- Universal compliance pipeline integrated into the CLI: `--list-compliance` and `--list-compliance-requirements` show universal frameworks, and CSV plus OCSF outputs are generated for any framework declaring a `TableConfig` [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
- ASD Essential Eight Maturity Model compliance framework for AWS (Maturity Level One, Nov 2023) [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
- Update Vercel checks to return personalized finding status extended depending on billing plan and classify them with billing-plan categories [(#10663)](https://github.com/prowler-cloud/prowler/pull/10663)
- `bedrock_prompt_management_exists` check for AWS provider [(#10878)](https://github.com/prowler-cloud/prowler/pull/10878)
### 🔄 Changed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Azure Network Watcher flow log checks now require workspace-backed Traffic Analytics for `network_flow_log_captured_sent` and align metadata with VNet-compatible flow log guidance [(#10645)](https://github.com/prowler-cloud/prowler/pull/10645)
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs [(#10937)](https://github.com/prowler-cloud/prowler/pull/10937)
- AWS CodeBuild service now batches `BatchGetProjects` and `BatchGetBuilds` calls per region (up to 100 items per call) to reduce API call volume and prevent throttling-induced false positives in `codebuild_project_not_publicly_accessible` [(#10639)](https://github.com/prowler-cloud/prowler/pull/10639)
- `display_compliance_table` dispatch switched from substring `in` checks to `startswith` to prevent false matches between similarly named frameworks (e.g. `cisa` vs `cis`) [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
@@ -22,6 +24,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
- AWS `boto` user agent extra is now applied to every client [(#10944)](https://github.com/prowler-cloud/prowler/pull/10944)
- Image provider connection check no longer fails with a misleading `host='https'` resolution error when the registry URL includes an `http://` or `https://` scheme prefix [(#10950)](https://github.com/prowler-cloud/prowler/pull/10950)
### 🔐 Security
@@ -29,6 +32,17 @@ All notable changes to the **Prowler SDK** are documented in this file.
---
## [5.25.2] (Prowler v5.25.2)
### 🐞 Fixed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
- Match K8s RBAC rules by `apiGroup` [(#10969)](https://github.com/prowler-cloud/prowler/pull/10969)
- Return a compact actor name from CloudTrail `userIdentity` events [(#10986)](https://github.com/prowler-cloud/prowler/pull/10986)
---
## [5.25.1] (Prowler v5.25.1)
### 🐞 Fixed
@@ -57,6 +71,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Google Workspace check reports now store the actual domain or account resource subject instead of `provider.identity` [(#10901)](https://github.com/prowler-cloud/prowler/pull/10901)
- `entra_users_mfa_capable` evaluating disabled guest accounts; CIS 5.2.3.4 only targets enabled member users [(#10785)](https://github.com/prowler-cloud/prowler/pull/10785)
### 🐞 Fixed
- `load_and_validate_config_file` now unwraps namespaced config for every built-in and external provider, and no longer leaks the full file as the provider's config when the file is namespaced [(#10700)](https://github.com/prowler-cloud/prowler/pull/10700)
---
## [5.24.3] (Prowler v5.24.3)
+34 -7
View File
@@ -10,7 +10,6 @@ from colorama import Fore, Style
from colorama import init as colorama_init
from prowler.config.config import (
EXTERNAL_TOOL_PROVIDERS,
cloud_api_base_url,
csv_file_suffix,
get_available_compliance_frameworks,
@@ -207,9 +206,10 @@ def prowler():
# We treat the compliance framework as another output format
if compliance_framework:
args.output_formats.extend(compliance_framework)
# If no input compliance framework, set all, unless a specific service or check is input
# Skip for IAC and LLM providers that don't use compliance frameworks
elif default_execution and provider not in ["iac", "llm"]:
# If no input compliance framework, set all, unless a specific service or check is input.
# Skip for tool-wrapper providers (iac, llm, image, and any external plug-in
# declaring `is_external_tool_provider = True`) — they don't use compliance frameworks.
elif default_execution and not Provider.is_tool_wrapper_provider(provider):
args.output_formats.extend(get_available_compliance_frameworks(provider))
# Set Logger configuration
@@ -247,7 +247,7 @@ def prowler():
universal_frameworks = {}
# Skip compliance frameworks for external-tool providers
if provider not in EXTERNAL_TOOL_PROVIDERS:
if not Provider.is_tool_wrapper_provider(provider):
bulk_compliance_frameworks = Compliance.get_bulk(provider)
# Complete checks metadata with the compliance framework specification
bulk_checks_metadata = update_checks_metadata_with_compliance(
@@ -315,7 +315,7 @@ def prowler():
sys.exit()
# Skip service and check loading for external-tool providers
if provider not in EXTERNAL_TOOL_PROVIDERS:
if not Provider.is_tool_wrapper_provider(provider):
# Import custom checks from folder
if checks_folder:
custom_checks = parse_checks_from_folder(global_provider, checks_folder)
@@ -426,6 +426,9 @@ def prowler():
output_options = VercelOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
else:
# Dynamic fallback: any external/custom provider
output_options = global_provider.get_output_options(args, bulk_checks_metadata)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:
@@ -435,7 +438,7 @@ def prowler():
# Execute checks
findings = []
if provider in EXTERNAL_TOOL_PROVIDERS:
if Provider.is_tool_wrapper_provider(provider):
# For external-tool providers, run the scan directly
if provider == "llm":
@@ -1343,6 +1346,30 @@ def prowler():
)
generated_outputs["compliance"].append(generic_compliance)
generic_compliance.batch_write_data_to_file()
else:
# Dynamic fallback: any external/custom provider
try:
global_provider.generate_compliance_output(
finding_outputs,
bulk_compliance_frameworks,
input_compliance_frameworks,
output_options,
generated_outputs,
)
except NotImplementedError:
# Last resort: generic compliance
for compliance_name in input_compliance_frameworks:
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
generic_compliance = GenericCompliance(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(generic_compliance)
generic_compliance.batch_write_data_to_file()
# AWS Security Hub Integration
if provider == "aws":
@@ -2897,6 +2897,7 @@
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"bedrock_prompt_management_exists",
"cloudformation_stack_outputs_find_secrets",
"cloudfront_distributions_custom_ssl_certificate",
"cloudfront_distributions_default_root_object",
@@ -2901,6 +2901,7 @@
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"bedrock_prompt_management_exists",
"cloudformation_stack_outputs_find_secrets",
"cloudfront_distributions_custom_ssl_certificate",
"cloudfront_distributions_default_root_object",
+61 -14
View File
@@ -1,3 +1,4 @@
import importlib.metadata
import os
import pathlib
from datetime import datetime, timezone
@@ -82,13 +83,38 @@ class Provider(str, Enum):
actual_directory = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
def _get_ep_compliance_dirs() -> dict:
"""Discover compliance directories from entry points. Returns {provider: path}."""
dirs = {}
for ep in importlib.metadata.entry_points(group="prowler.compliance"):
try:
module = ep.load()
if hasattr(module, "__path__"):
dirs[ep.name] = module.__path__[0]
elif hasattr(module, "__file__"):
dirs[ep.name] = os.path.dirname(module.__file__)
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return dirs
def get_available_compliance_frameworks(provider=None):
available_compliance_frameworks = []
providers = [p.value for p in Provider]
# Built-in compliance
compliance_base = f"{actual_directory}/../compliance"
if provider:
providers = [provider]
for current_provider in providers:
compliance_dir = f"{actual_directory}/../compliance/{current_provider}"
else:
# Scan compliance directory for all provider subdirectories
providers = []
if os.path.isdir(compliance_base):
for entry in os.scandir(compliance_base):
if entry.is_dir():
providers.append(entry.name)
for prov in providers:
compliance_dir = f"{compliance_base}/{prov}"
if not os.path.isdir(compliance_dir):
continue
with os.scandir(compliance_dir) as files:
@@ -97,7 +123,8 @@ def get_available_compliance_frameworks(provider=None):
available_compliance_frameworks.append(
file.name.removesuffix(".json")
)
# Also scan top-level compliance/ for multi-provider (universal) JSONs.
# Built-in multi-provider frameworks at top-level compliance/ directory.
# Placed before external entry points so built-ins win on name collisions.
# When a specific provider was requested, only include the framework if it
# declares support for that provider; otherwise include all universal frameworks.
compliance_root = f"{actual_directory}/../compliance"
@@ -114,6 +141,18 @@ def get_available_compliance_frameworks(provider=None):
continue
if name not in available_compliance_frameworks:
available_compliance_frameworks.append(name)
# External compliance via entry points.
# Multi-provider support for external plug-ins is tracked in PROWLER-1444.
ep_dirs = _get_ep_compliance_dirs()
for prov, path in ep_dirs.items():
if provider and prov != provider:
continue
if os.path.isdir(path):
for file in os.scandir(path):
if file.is_file() and file.name.endswith(".json"):
available_compliance_frameworks.append(
file.name.removesuffix(".json")
)
return available_compliance_frameworks
@@ -225,18 +264,26 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
with open(config_file_path, "r", encoding=encoding_format_utf_8) as f:
config_file = yaml.safe_load(f)
# Not to introduce a breaking change, allow the old format config file without any provider keys
# and a new format with a key for each provider to include their configuration values within.
if any(
key in config_file
for key in ["aws", "gcp", "azure", "kubernetes", "m365"]
# Namespaced format: each provider has its own top-level key.
# Works for every built-in and every external plugin without a hardcoded list.
# Flat legacy format is AWS-only (historical, pre-multicloud). We identify it
# by the absence of nested-dict top-level values (namespaced files always
# have dict values; the legacy AWS format only has primitives/lists).
if (
isinstance(config_file, dict)
and provider in config_file
and isinstance(config_file[provider], dict)
):
config = config_file.get(provider, {})
config = config_file.get(provider, {}) or {}
elif (
isinstance(config_file, dict)
and config_file
and provider == "aws"
and not any(isinstance(v, dict) for v in config_file.values())
):
config = config_file
else:
config = config_file if config_file else {}
# Not to break Azure, K8s and GCP does not support or use the old config format
if provider in ["azure", "gcp", "kubernetes", "m365"]:
config = {}
config = {}
return config
+53 -6
View File
@@ -1,4 +1,6 @@
import importlib
import importlib.metadata
import importlib.util
import json
import os
import re
@@ -19,6 +21,7 @@ from prowler.lib.check.utils import recover_checks_from_provider
from prowler.lib.logger import logger
from prowler.lib.outputs.outputs import report
from prowler.lib.utils.utils import open_file, parse_json_file, print_boxes
from prowler.providers.common.builtin import is_builtin_provider
from prowler.providers.common.models import Audit_Metadata
@@ -385,6 +388,45 @@ def import_check(check_path: str) -> ModuleType:
return lib
def _resolve_check_module(
provider_type: str, service: str, check_name: str
) -> ModuleType:
"""Resolve and import a check module.
Built-in wins on CheckID collision. Plug-ins are first-class extenders
(they can add new checks under new CheckIDs) but cannot override
existing built-ins — a security tool prefers fail-loud predictability
over silent overrides. CheckMetadata.get_bulk() applies the same
precedence on the metadata side (first-write-wins) and emits a warning
when a plug-in tries to override, so the user knows their plug-in
duplicate is being ignored and can rename it.
Gates the built-in branch on `is_builtin_provider(provider_type)` —
calling `find_spec` on `prowler.providers.{provider_type}.services...`
directly would propagate `ModuleNotFoundError` for external providers
(their parent package `prowler.providers.{provider_type}` does not
exist) instead of returning None. The leaf helper encapsulates the
safe lookup, so external providers go straight to entry points. For
built-ins we still use `find_spec` to distinguish "check doesn't
exist" from "check exists but failed to import" (broken transitive
dep, etc.).
"""
# Built-in first — built-in wins on CheckID collision
if is_builtin_provider(provider_type):
builtin_path = f"prowler.providers.{provider_type}.services.{service}.{check_name}.{check_name}"
if importlib.util.find_spec(builtin_path) is not None:
return import_check(builtin_path)
# Entry point lookup — only consulted when the built-in truly doesn't exist
for ep in importlib.metadata.entry_points(group=f"prowler.checks.{provider_type}"):
if ep.name == check_name:
return importlib.import_module(ep.value)
raise ModuleNotFoundError(
f"Check '{check_name}' not found for provider '{provider_type}'"
)
def run_fixer(check_findings: list) -> int:
"""
Run the fixer for the check if it exists and there are any FAIL findings
@@ -525,9 +567,10 @@ def execute_checks(
service = check_name.split("_")[0]
try:
try:
# Import check module
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Import check module (built-in or entry point)
lib = _resolve_check_module(
global_provider.type, service, check_name
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
@@ -605,9 +648,10 @@ def execute_checks(
)
try:
try:
# Import check module
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Import check module (built-in or entry point)
lib = _resolve_check_module(
global_provider.type, service, check_name
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
@@ -745,6 +789,9 @@ def execute(
is_finding_muted_args["tenancy_id"] = (
global_provider.identity.tenancy_id
)
else:
# External/custom provider — delegate identity args
is_finding_muted_args = global_provider.get_mutelist_finding_args()
for finding in check_findings:
if global_provider.type == "cloudflare":
is_finding_muted_args["account_id"] = finding.account_id
+8 -3
View File
@@ -2,10 +2,10 @@ import sys
from colorama import Fore, Style
from prowler.config.config import EXTERNAL_TOOL_PROVIDERS
from prowler.lib.check.check import parse_checks_from_file
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.models import CheckMetadata, Severity
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
from prowler.lib.logger import logger
@@ -26,8 +26,13 @@ def load_checks_to_execute(
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
# Bypass check loading for providers that use external tools directly
if provider in EXTERNAL_TOOL_PROVIDERS:
# Bypass check loading for tool-wrapper providers — they delegate
# scanning to an external tool and have no checks to recover.
# Single source of truth across __main__, the CheckMetadata validators,
# check discovery and this loader, covering both built-in tool wrappers
# (iac/llm/image) and external plug-ins that declare
# `is_external_tool_provider = True` via the contract.
if is_tool_wrapper_provider(provider):
return set()
# Local subsets
+35 -5
View File
@@ -1,3 +1,4 @@
import importlib.metadata
import json
import os
import sys
@@ -434,26 +435,55 @@ class Compliance(BaseModel):
"""Bulk load all compliance frameworks specification into a dict"""
try:
bulk_compliance_frameworks = {}
# Built-in compliance from prowler/compliance/{provider}/
available_compliance_framework_modules = list_compliance_modules()
for compliance_framework in available_compliance_framework_modules:
if provider in compliance_framework.name:
compliance_specification_dir_path = (
f"{compliance_framework.module_finder.path}/{provider}"
)
# for compliance_framework in available_compliance_framework_modules:
for filename in os.listdir(compliance_specification_dir_path):
file_path = os.path.join(
compliance_specification_dir_path, filename
)
# Check if it is a file and ti size is greater than 0
if os.path.isfile(file_path) and os.stat(file_path).st_size > 0:
# Open Compliance file in JSON
# cis_v1.4_aws.json --> cis_v1.4_aws
compliance_framework_name = filename.split(".json")[0]
# Store the compliance info
bulk_compliance_frameworks[compliance_framework_name] = (
load_compliance_framework(file_path)
)
# External compliance via entry points
for ep in importlib.metadata.entry_points(group="prowler.compliance"):
if ep.name == provider:
try:
module = ep.load()
compliance_dir = (
module.__path__[0]
if hasattr(module, "__path__")
else os.path.dirname(module.__file__)
)
for filename in os.listdir(compliance_dir):
if filename.endswith(".json"):
file_path = os.path.join(compliance_dir, filename)
if (
os.path.isfile(file_path)
and os.stat(file_path).st_size > 0
):
compliance_framework_name = filename.split(".json")[
0
]
if (
compliance_framework_name
not in bulk_compliance_frameworks
):
bulk_compliance_frameworks[
compliance_framework_name
] = load_compliance_framework(file_path)
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
+43 -19
View File
@@ -11,10 +11,10 @@ from typing import Any, Dict, Optional, Set
from pydantic.v1 import BaseModel, Field, ValidationError, validator
from pydantic.v1.error_wrappers import ErrorWrapper
from prowler.config.config import EXTERNAL_TOOL_PROVIDERS, Provider
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.utils import recover_checks_from_provider
from prowler.lib.logger import logger
from prowler.providers.common.provider import Provider as ProviderABC
# Valid ResourceGroup values as defined in the RFC
VALID_RESOURCE_GROUPS = frozenset(
@@ -62,6 +62,9 @@ VALID_CATEGORIES = frozenset(
"e5",
"privilege-escalation",
"ec2-imdsv1",
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
}
)
@@ -244,18 +247,19 @@ class CheckMetadata(BaseModel):
# store the compliance later if supplied
Compliance: Optional[list[Any]] = Field(default_factory=list)
# TODO: Remove noqa and fix cls vulture errors
@validator("Categories", each_item=True, pre=True, always=True)
def valid_category(cls, value, values):
def valid_category(cls, value, values): # noqa: F841
if not isinstance(value, str):
raise ValueError("Categories must be a list of strings")
value_lower = value.lower()
if not re.match("^[a-z0-9-]+$", value_lower):
raise ValueError(
f"Invalid category: {value}. Categories can only contain lowercase letters, numbers and hyphen '-'"
f"Invalid category: {value}. Categories can only contain lowercase letters, numbers, and hyphen '-'"
)
if (
value_lower not in VALID_CATEGORIES
and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS
and not ProviderABC.is_tool_wrapper_provider(values.get("Provider"))
):
raise ValueError(
f"Invalid category: '{value_lower}'. Must be one of: {', '.join(sorted(VALID_CATEGORIES))}."
@@ -279,12 +283,14 @@ class CheckMetadata(BaseModel):
return resource_type
@validator("ServiceName", pre=True, always=True)
def validate_service_name(cls, service_name, values):
def validate_service_name(cls, service_name, values): # noqa: F841
if not service_name:
raise ValueError("ServiceName must be a non-empty string")
check_id = values.get("CheckID")
if check_id and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if check_id and not ProviderABC.is_tool_wrapper_provider(
values.get("Provider")
):
service_from_check_id = check_id.split("_")[0]
if service_name != service_from_check_id:
raise ValueError(
@@ -296,11 +302,13 @@ class CheckMetadata(BaseModel):
return service_name
@validator("CheckID", pre=True, always=True)
def valid_check_id(cls, check_id, values):
def valid_check_id(cls, check_id, values): # noqa: F841
if not check_id:
raise ValueError("CheckID must be a non-empty string")
if check_id and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if check_id and not ProviderABC.is_tool_wrapper_provider(
values.get("Provider")
):
if "-" in check_id:
raise ValueError(
f"CheckID {check_id} contains a hyphen, which is not allowed"
@@ -310,7 +318,7 @@ class CheckMetadata(BaseModel):
@validator("CheckTitle", pre=True, always=True)
def validate_check_title(cls, check_title, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
if len(check_title) > 150:
raise ValueError(
f"CheckTitle must not exceed 150 characters, got {len(check_title)} characters"
@@ -323,13 +331,15 @@ class CheckMetadata(BaseModel):
@validator("RelatedUrl", pre=True, always=True)
def validate_related_url(cls, related_url, values):
if related_url and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if related_url and not ProviderABC.is_tool_wrapper_provider(
values.get("Provider")
):
raise ValueError("RelatedUrl must be empty. This field is deprecated.")
return related_url
@validator("Remediation")
def validate_recommendation_url(cls, remediation, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
url = remediation.Recommendation.Url
if url and not url.startswith("https://hub.prowler.com/"):
raise ValueError(
@@ -338,11 +348,11 @@ class CheckMetadata(BaseModel):
return remediation
@validator("CheckType", pre=True, always=True)
def validate_check_type(cls, check_type, values):
def validate_check_type(cls, check_type, values): # noqa: F841
provider = values.get("Provider", "").lower()
# Non-AWS providers must have an empty CheckType list
if provider != "aws" and provider not in EXTERNAL_TOOL_PROVIDERS:
if provider != "aws" and not ProviderABC.is_tool_wrapper_provider(provider):
if check_type:
raise ValueError(
f"CheckType must be empty for non-AWS providers. Got {check_type} for provider '{provider}'."
@@ -368,7 +378,7 @@ class CheckMetadata(BaseModel):
@validator("Description", pre=True, always=True)
def validate_description(cls, description, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
if len(description) > 400:
raise ValueError(
f"Description must not exceed 400 characters, got {len(description)} characters"
@@ -377,7 +387,7 @@ class CheckMetadata(BaseModel):
@validator("Risk", pre=True, always=True)
def validate_risk(cls, risk, values):
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
if len(risk) > 400:
raise ValueError(
f"Risk must not exceed 400 characters, got {len(risk)} characters"
@@ -385,7 +395,7 @@ class CheckMetadata(BaseModel):
return risk
@validator("ResourceGroup", pre=True, always=True)
def validate_resource_group(cls, resource_group):
def validate_resource_group(cls, resource_group): # noqa: F841
if resource_group and resource_group not in VALID_RESOURCE_GROUPS:
raise ValueError(
f"Invalid ResourceGroup: '{resource_group}'. Must be one of: {', '.join(sorted(VALID_RESOURCE_GROUPS))} or empty string."
@@ -393,7 +403,7 @@ class CheckMetadata(BaseModel):
return resource_group
@validator("AdditionalURLs", pre=True, always=True)
def validate_additional_urls(cls, additional_urls):
def validate_additional_urls(cls, additional_urls): # noqa: F841
if not isinstance(additional_urls, list):
raise ValueError("AdditionalURLs must be a list")
@@ -429,6 +439,20 @@ class CheckMetadata(BaseModel):
metadata_file = f"{check_path}/{check_name}.metadata.json"
# Load metadata
check_metadata = load_check_metadata(metadata_file)
# Built-in wins on CheckID collision. Plug-in entry points are
# appended after built-ins by `recover_checks_from_provider`, so
# a duplicate CheckID here means an entry-point check is trying
# to override a built-in. Ignore the override (the built-in
# metadata stays) and surface it via a warning — matching the
# precedence enforced by `_resolve_check_module`.
if check_metadata.CheckID in bulk_check_metadata:
logger.warning(
f"Plug-in check metadata '{check_metadata.CheckID}' "
f"(loaded from '{metadata_file}') is being IGNORED — "
f"a built-in with the same CheckID exists. To use your "
f"plug-in, register it under a different CheckID."
)
continue
bulk_check_metadata[check_metadata.CheckID] = check_metadata
return bulk_check_metadata
@@ -466,7 +490,7 @@ class CheckMetadata(BaseModel):
# If the bulk checks metadata is not provided, get it
if not bulk_checks_metadata:
bulk_checks_metadata = {}
available_providers = [p.value for p in Provider]
available_providers = ProviderABC.get_available_providers()
for provider_name in available_providers:
bulk_checks_metadata.update(CheckMetadata.get_bulk(provider_name))
if provider:
@@ -491,7 +515,7 @@ class CheckMetadata(BaseModel):
# Loaded here, as it is not always needed
if not bulk_compliance_frameworks:
bulk_compliance_frameworks = {}
available_providers = [p.value for p in Provider]
available_providers = ProviderABC.get_available_providers()
for provider in available_providers:
bulk_compliance_frameworks = Compliance.get_bulk(provider=provider)
checks_from_compliance_framework = (
+57
View File
@@ -0,0 +1,57 @@
"""Standalone helper for tool-wrapper provider detection.
A provider is a "tool wrapper" if it delegates scanning to an external tool
(Trivy, promptfoo, etc.) instead of running checks/services through the
standard Prowler engine. This module is the single source of truth for that
classification across the codebase.
Kept as a leaf module with no Prowler imports beyond the leaf
`external_tool_providers` so it can be referenced from `prowler.lib.check.*`
and `prowler.providers.common.provider` without forming an import cycle.
"""
import importlib.metadata
from prowler.lib.check.external_tool_providers import EXTERNAL_TOOL_PROVIDERS
# Module-level cache for entry-point classes consulted by this helper.
# Independent of `Provider._ep_providers` to keep this module leaf — the cost
# of a duplicate cache entry is negligible (one class object per external
# provider, loaded lazily on first lookup).
_ep_class_cache: dict = {}
def _load_ep_class(provider: str):
"""Return the entry-point provider class for `provider`, or None.
Caches the result in `_ep_class_cache`. Errors during entry-point loading
are swallowed (returning None) so a broken plug-in never crashes the
is-tool-wrapper check; it just falls through to "not a tool wrapper".
"""
if provider in _ep_class_cache:
return _ep_class_cache[provider]
for ep in importlib.metadata.entry_points(group="prowler.providers"):
if ep.name == provider:
try:
cls = ep.load()
except Exception:
cls = None
_ep_class_cache[provider] = cls
return cls
_ep_class_cache[provider] = None
return None
def is_tool_wrapper_provider(provider: str) -> bool:
"""Return True if the provider delegates scanning to an external tool.
Combines the built-in `EXTERNAL_TOOL_PROVIDERS` frozenset (fast path for
iac/llm/image) with the `is_external_tool_provider` class attribute of
external plug-ins registered via entry points. This is the single source
of truth consulted by `__main__`, the `CheckMetadata` validators, the
check-loading utilities, and the checks loader.
"""
if provider in EXTERNAL_TOOL_PROVIDERS:
return True
cls = _load_ep_class(provider)
return bool(cls and getattr(cls, "is_external_tool_provider", False))
+84 -23
View File
@@ -1,9 +1,43 @@
import importlib
import importlib.metadata
import importlib.util
import os
import sys
from pkgutil import walk_packages
from prowler.lib.check.external_tool_providers import EXTERNAL_TOOL_PROVIDERS
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
from prowler.lib.logger import logger
from prowler.providers.common.builtin import is_builtin_provider
def _recover_ep_checks(provider: str, service: str = None) -> list[tuple]:
"""Discover external checks registered via entry points for a provider.
External plugins follow the same layout as built-ins:
`{plugin_root}.services.{service}.{check}.{check}`
When `service` is provided, only entry points whose dotted path contains
`.services.{service}.` are included mirroring how built-in discovery
filters by the `prowler.providers.{provider}.services.{service}` package.
Uses find_spec to locate the check module without importing it,
avoiding service client initialization at discovery time.
"""
checks = []
for ep in importlib.metadata.entry_points(group=f"prowler.checks.{provider}"):
try:
if service and f".services.{service}." not in ep.value:
continue
spec = importlib.util.find_spec(ep.value)
if spec and spec.origin:
check_path = os.path.dirname(spec.origin)
checks.append((ep.name, check_path))
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return checks
def recover_checks_from_provider(
@@ -15,29 +49,55 @@ def recover_checks_from_provider(
Returns a list of tuples with the following format (check_name, check_path)
"""
try:
# Bypass check loading for providers that use external tools directly
if provider in EXTERNAL_TOOL_PROVIDERS:
# Bypass check loading for tool-wrapper providers — they delegate
# scanning to an external tool and have no checks to recover.
# Single source of truth: combines the EXTERNAL_TOOL_PROVIDERS
# frozenset (built-ins) with the per-provider `is_external_tool_provider`
# class attribute (so external plug-ins opt in via the contract).
if is_tool_wrapper_provider(provider):
return []
checks = []
modules = list_modules(provider, service)
for module_name in modules:
# Format: "prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
check_module_name = module_name.name
# We need to exclude common shared libraries in services
if (
check_module_name.count(".") == 6
and ".lib." not in check_module_name
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path
# Check name is the last part of the check_module_name
check_name = check_module_name.split(".")[-1]
check_info = (check_name, check_path)
checks.append(check_info)
except ModuleNotFoundError:
logger.critical(f"Service {service} was not found for the {provider} provider.")
sys.exit(1)
# Built-in checks from prowler.providers.{provider}.services. Gate
# the built-in branch on `is_builtin_provider(provider)` — calling
# `find_spec` directly on `prowler.providers.{provider}.services`
# would propagate `ModuleNotFoundError` when the parent package
# `prowler.providers.{provider}` does not exist (i.e. the provider
# is external), instead of returning None. The leaf helper
# encapsulates the safe lookup, so we only run the built-in
# discovery when the provider actually ships with the SDK; for
# external providers we go straight to entry points.
if is_builtin_provider(provider):
modules = list_modules(provider, service)
for module_name in modules:
# Format: "prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
check_module_name = module_name.name
# We need to exclude common shared libraries in services
if (
check_module_name.count(".") == 6
and ".lib." not in check_module_name
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path
check_name = check_module_name.split(".")[-1]
check_info = (check_name, check_path)
checks.append(check_info)
# External checks registered via entry points — always consulted, with
# optional service filter. Previously gated by `if not service:`, which
# prevented external providers from being usable with --service.
checks.extend(_recover_ep_checks(provider, service))
# A service was requested but nothing matched in either built-ins or
# entry points — surface this as a clear error instead of silently
# returning an empty list.
if service and not checks:
logger.critical(
f"Service '{service}' was not found for the '{provider}' provider "
f"(neither as a built-in nor via external entry points)."
)
sys.exit(1)
except Exception as e:
logger.critical(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}]: {e}")
sys.exit(1)
@@ -64,8 +124,9 @@ def recover_checks_from_service(service_list: list, provider: str) -> set:
Returns a set of checks from the given services
"""
try:
# Bypass check loading for providers that use external tools directly
if provider in EXTERNAL_TOOL_PROVIDERS:
# Bypass check loading for tool-wrapper providers — symmetric with
# `recover_checks_from_provider` above, using the same source of truth.
if is_tool_wrapper_provider(provider):
return set()
checks = set()
+48 -7
View File
@@ -20,19 +20,58 @@ from prowler.providers.common.arguments import (
validate_provider_arguments,
validate_sarif_usage,
)
from prowler.providers.common.provider import Provider
class ProwlerArgumentParser:
# Set the default parser
def __init__(self):
# Discover any providers not in the hardcoded list below
# TODO - First step to support current providers and the new external provider implementation
known_providers = {
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"googleworkspace",
"cloudflare",
"oraclecloud",
"openstack",
"alibabacloud",
"iac",
"llm",
"image",
"nhn",
"mongodbatlas",
"vercel",
}
all_providers = set(Provider.get_available_providers())
new_providers = sorted(all_providers - known_providers)
# Build extra strings for dynamically discovered providers
extra_providers_csv = ""
extra_providers_text = ""
if new_providers:
providers_help = Provider.get_providers_help_text()
extra_providers_csv = "," + ",".join(new_providers)
extra_lines = []
for name in new_providers:
help_text = providers_help.get(name, "")
if help_text:
extra_lines.append(f" {name:<20}{help_text}")
if extra_lines:
extra_providers_text = "\n" + "\n".join(extra_lines)
# CLI Arguments
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm} ...",
epilog="""
usage=f"prowler [-h] [--version] {{aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm{extra_providers_csv}}} ...",
epilog=f"""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,googleworkspace,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel}
{{aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm{extra_providers_csv}}}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -49,13 +88,13 @@ Available Cloud Providers:
image Container Image Provider
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider
vercel Vercel Provider
vercel Vercel Provider{extra_providers_text}
Available components:
dashboard Local dashboard
To see the different available options on a specific component, run:
prowler {provider|dashboard} -h|--help
prowler {{provider|dashboard}} -h|--help
Detailed documentation at https://docs.prowler.com
""",
@@ -114,8 +153,10 @@ Detailed documentation at https://docs.prowler.com
and (sys.argv[1] not in ("-v", "--version"))
):
# Since the provider is always the second argument, we are checking if
# a flag, starting by "-", is supplied
if "-" in sys.argv[1]:
# a flag is supplied. Use startswith("-") instead of "in" to avoid
# matching external provider names that contain hyphens
# (e.g. "local-acme-snowflake").
if sys.argv[1].startswith("-"):
sys.argv = self.__set_default_provider__(sys.argv)
# Provider aliases mapping
+26 -8
View File
@@ -243,14 +243,32 @@ def display_compliance_table(
compliance_overview,
)
else:
get_generic_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
# Try provider-specific table first, fall back to generic
from prowler.providers.common.provider import Provider
provider = Provider.get_global_provider()
handled = False
if provider is not None:
try:
handled = provider.display_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
except NotImplementedError:
handled = False
if not handled:
get_generic_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
+5
View File
@@ -474,6 +474,11 @@ class Finding(BaseModel):
check_output, "fixed_version", ""
)
else:
# Dynamic fallback: any external/custom provider
provider_data = provider.get_finding_output_data(check_output)
output_data.update(provider_data)
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name
+7 -5
View File
@@ -1417,11 +1417,13 @@ class HTML(Output):
# Azure_provider --> azure
# Kubernetes_provider --> kubernetes
# Dynamically get the Provider quick inventory handler
provider_html_assessment_summary_function = (
f"get_{provider.type}_assessment_summary"
)
return getattr(HTML, provider_html_assessment_summary_function)(provider)
# Try static method first, fall back to provider method
method_name = f"get_{provider.type}_assessment_summary"
if hasattr(HTML, method_name):
return getattr(HTML, method_name)(provider)
else:
# Dynamic fallback: any external/custom provider
return provider.get_html_assessment_summary()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
+33 -19
View File
@@ -7,39 +7,46 @@ from prowler.lib.outputs.common import Status
from prowler.lib.outputs.finding import Finding
def stdout_report(finding, color, verbose, status, fix):
def stdout_report(finding, color, verbose, status, fix, provider=None):
if finding.check_metadata.Provider == "aws":
details = finding.region
if finding.check_metadata.Provider == "azure":
elif finding.check_metadata.Provider == "azure":
details = finding.location
if finding.check_metadata.Provider == "gcp":
elif finding.check_metadata.Provider == "gcp":
details = finding.location.lower()
if finding.check_metadata.Provider == "kubernetes":
elif finding.check_metadata.Provider == "kubernetes":
details = finding.namespace.lower()
if finding.check_metadata.Provider == "github":
elif finding.check_metadata.Provider == "github":
details = finding.owner
if finding.check_metadata.Provider == "m365":
elif finding.check_metadata.Provider == "m365":
details = finding.location
if finding.check_metadata.Provider == "mongodbatlas":
elif finding.check_metadata.Provider == "mongodbatlas":
details = finding.location
if finding.check_metadata.Provider == "nhn":
elif finding.check_metadata.Provider == "nhn":
details = finding.location
if finding.check_metadata.Provider == "llm":
elif finding.check_metadata.Provider == "llm":
details = finding.check_metadata.CheckID
if finding.check_metadata.Provider == "iac":
elif finding.check_metadata.Provider == "iac":
details = finding.check_metadata.CheckID
if finding.check_metadata.Provider == "oraclecloud":
elif finding.check_metadata.Provider == "oraclecloud":
details = finding.region
if finding.check_metadata.Provider == "alibabacloud":
elif finding.check_metadata.Provider == "alibabacloud":
details = finding.region
if finding.check_metadata.Provider == "openstack":
elif finding.check_metadata.Provider == "openstack":
details = finding.region
if finding.check_metadata.Provider == "cloudflare":
elif finding.check_metadata.Provider == "cloudflare":
details = finding.zone_name
if finding.check_metadata.Provider == "googleworkspace":
elif finding.check_metadata.Provider == "googleworkspace":
details = finding.location
if finding.check_metadata.Provider == "vercel":
elif finding.check_metadata.Provider == "vercel":
details = finding.region
else:
# Dynamic fallback: any external/custom provider
if provider is None:
from prowler.providers.common.provider import Provider
provider = Provider.get_global_provider()
details = provider.get_stdout_detail(finding)
if (verbose or fix) and (not status or finding.status in status):
if finding.muted:
@@ -59,12 +66,15 @@ def report(check_findings, provider, output_options):
if hasattr(output_options, "verbose"):
verbose = output_options.verbose
if check_findings:
# TO-DO Generic Function
if provider.type == "aws":
check_findings.sort(key=lambda x: x.region)
if provider.type == "azure":
elif provider.type == "azure":
check_findings.sort(key=lambda x: x.subscription)
else:
# Dynamic fallback: any external/custom provider
sort_key = provider.get_finding_sort_key()
if sort_key and isinstance(sort_key, str):
check_findings.sort(key=lambda x: getattr(x, sort_key, ""))
for finding in check_findings:
# Print findings by stdout
@@ -75,12 +85,16 @@ def report(check_findings, provider, output_options):
if hasattr(output_options, "fixer"):
fixer = output_options.fixer
color = set_report_color(finding.status, finding.muted)
# Pass the local `provider` through so the dynamic else inside
# `stdout_report` does not have to consult the global singleton
# — defeating the whole purpose of the new parameter.
stdout_report(
finding,
color,
verbose,
status,
fixer,
provider=provider,
)
else: # No service resources in the whole account
+3
View File
@@ -108,6 +108,9 @@ def display_summary_table(
)
else:
audited_entities = provider.identity.username or "Personal Account"
else:
# Dynamic fallback: any external/custom provider
entity_type, audited_entities = provider.get_summary_entity()
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):
+9 -4
View File
@@ -4,8 +4,8 @@ from types import SimpleNamespace
from typing import Generator
from prowler.lib.check.check import (
_resolve_check_module,
execute,
import_check,
list_services,
update_audit_metadata,
)
@@ -426,9 +426,14 @@ class Scan:
# Recover service from check name
service = get_service_name_from_check_name(check_name)
try:
# Import check module
check_module_path = f"prowler.providers.{self._provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Import check module (built-in or entry point) —
# delegates to `_resolve_check_module` so external
# providers registered via entry points are resolved
# correctly (their checks do not live under
# `prowler.providers.{type}.services...`).
lib = _resolve_check_module(
self._provider.type, service, check_name
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
@@ -221,27 +221,12 @@ class CloudTrailTimeline(TimelineService):
@staticmethod
def _extract_actor(user_identity: Dict[str, Any]) -> str:
"""Extract a human-readable actor name from CloudTrail userIdentity."""
# Try ARN first - most reliable
"""Return a compact actor name from CloudTrail userIdentity.
For ARNs, returns the resource portion (everything after the last
`:`) e.g. `user/alice`, `assumed-role/MyRole/session-name`,
`root`. The full ARN is preserved separately in `actor_uid`.
"""
if arn := user_identity.get("arn"):
if "/" in arn:
parts = arn.split("/")
# For assumed-role, return the role name (second-to-last part)
if "assumed-role" in arn and len(parts) >= 2:
return parts[-2]
return parts[-1]
return arn.split(":")[-1]
# Fall back to userName
if username := user_identity.get("userName"):
return username
# Fall back to principalId
if principal_id := user_identity.get("principalId"):
return principal_id
# For service-invoked actions
if invoking_service := user_identity.get("invokedBy"):
return invoking_service
return "Unknown"
return arn.rsplit(":", 1)[-1]
return user_identity.get("invokedBy") or "Unknown"
@@ -0,0 +1,39 @@
{
"Provider": "aws",
"CheckID": "bedrock_prompt_management_exists",
"CheckTitle": "Amazon Bedrock Prompt Management prompts exist in the region",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "bedrock",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Other",
"ResourceGroup": "ai_ml",
"Description": "**Bedrock Prompt Management** enables centralized creation, versioning, and governance of prompts used with foundation models.\n\nThis region-level check verifies whether at least one managed prompt exists in each scanned region, used as an adoption signal for Prompt Management. The presence of a prompt does not by itself guarantee that every application prompt is managed.",
"Risk": "Without **Prompt Management**, prompts are scattered across applications with no central oversight, versioning, or auditability over instructions sent to foundation models, weakening governance and compliance posture.\n\nManaged prompts are a governance enabler; **prompt injection** defenses are provided by Bedrock **guardrails**, covered by separate checks.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-management.html",
"https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-management-create.html"
],
"Remediation": {
"Code": {
"CLI": "aws bedrock-agent create-prompt --name example_prompt --default-variant default --variants '[{\"name\":\"default\",\"templateType\":\"TEXT\",\"templateConfiguration\":{\"text\":{\"text\":\"Your prompt template here.\"}}}]'",
"NativeIaC": "",
"Other": "1. Open the Amazon Bedrock console\n2. Navigate to Prompt Management\n3. Click Create prompt\n4. Provide a name and configure the prompt template (a prompt can contain at most one variant; additional variants are created via CreatePromptVersion)\n5. Save the prompt",
"Terraform": ""
},
"Recommendation": {
"Text": "Adopt **Bedrock Prompt Management** to centralize prompt definitions, enforce versioning, and maintain governance over model interactions.\n\nUse managed prompts with **guardrails** and apply **least privilege** access controls to restrict who can create or modify prompts.",
"Url": "https://hub.prowler.com/check/bedrock_prompt_management_exists"
}
},
"Categories": [
"gen-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Results are generated per scanned region. Regions where `ListPrompts` cannot be queried are omitted from the findings."
}
@@ -0,0 +1,54 @@
"""Check for region-level Bedrock Prompt Management adoption."""
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.bedrock.bedrock_agent_client import (
bedrock_agent_client,
)
class bedrock_prompt_management_exists(Check):
"""Check whether Amazon Bedrock Prompt Management prompts exist in the region.
A region is reported only when ListPrompts succeeded for it; regions where
the API call failed (e.g. AccessDenied, unsupported region) are skipped at
the service layer and produce no finding.
- PASS: At least one managed prompt exists in the region (one finding per prompt).
- FAIL: No managed prompts exist in the region (one finding per region).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the Bedrock Prompt Management exists check.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for region in sorted(bedrock_agent_client.prompt_scanned_regions):
regional_prompts = sorted(
(
prompt
for prompt in bedrock_agent_client.prompts.values()
if prompt.region == region
),
key=lambda prompt: prompt.name,
)
if regional_prompts:
for prompt in regional_prompts:
report = Check_Report_AWS(metadata=self.metadata(), resource=prompt)
report.status = "PASS"
report.status_extended = f"Bedrock Prompt Management prompt {prompt.name} exists in region {region}."
findings.append(report)
else:
report = Check_Report_AWS(metadata=self.metadata(), resource={})
report.region = region
report.resource_id = "prompt-management"
report.resource_arn = f"arn:{bedrock_agent_client.audited_partition}:bedrock:{region}:{bedrock_agent_client.audited_account}:prompt-management"
report.status = "FAIL"
report.status_extended = (
f"No Bedrock Prompt Management prompts exist in region {region}."
)
findings.append(report)
return findings
@@ -140,7 +140,10 @@ class BedrockAgent(AWSService):
# Call AWSService's __init__
super().__init__("bedrock-agent", provider)
self.agents = {}
self.prompts = {}
self.prompt_scanned_regions: set = set()
self.__threading_call__(self._list_agents)
self.__threading_call__(self._list_prompts)
self.__threading_call__(self._list_tags_for_resource, self.agents.values())
def _list_agents(self, regional_client):
@@ -167,7 +170,32 @@ class BedrockAgent(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_prompts(self, regional_client):
"""List all prompts in a region.
Prompt Management is evaluated as a region-level adoption signal, so
prompt collection is intentionally not filtered by audit_resources.
"""
logger.info("Bedrock Agent - Listing Prompts...")
try:
paginator = regional_client.get_paginator("list_prompts")
for page in paginator.paginate():
for prompt in page.get("promptSummaries", []):
prompt_arn = prompt.get("arn", "")
self.prompts[prompt_arn] = Prompt(
id=prompt.get("id", ""),
name=prompt.get("name", ""),
arn=prompt_arn,
region=regional_client.region,
)
self.prompt_scanned_regions.add(regional_client.region)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_tags_for_resource(self, resource):
"""List tags for a Bedrock Agent resource."""
logger.info("Bedrock Agent - Listing Tags for Resource...")
try:
agent_tags = (
@@ -190,3 +218,12 @@ class Agent(BaseModel):
guardrail_id: Optional[str] = None
region: str
tags: Optional[list] = []
class Prompt(BaseModel):
"""Model representing a Bedrock Prompt Management prompt."""
id: str
name: str
arn: str
region: str
+35 -12
View File
@@ -16,18 +16,41 @@ def init_providers_parser(self):
# We need to call the arguments parser for each provider
providers = Provider.get_available_providers()
for provider in providers:
try:
getattr(
import_module(
f"{providers_path}.{provider}.{provider_arguments_lib_path}"
),
init_provider_arguments_function,
)(self)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
# Discriminate built-in vs external upfront via find_spec, so an
# ImportError from a transitive dependency missing inside a built-in
# arguments module surfaces clearly instead of being silently
# re-routed to the entry-point path (which only has external providers).
if Provider.is_builtin(provider):
try:
getattr(
import_module(
f"{providers_path}.{provider}.{provider_arguments_lib_path}"
),
init_provider_arguments_function,
)(self)
except ImportError as e:
logger.critical(
f"Failed to load arguments for built-in provider '{provider}'. "
f"Missing dependency: {e}. "
f"Ensure all required dependencies are installed."
)
logger.debug("Full traceback:", exc_info=True)
sys.exit(1)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
else:
# External provider — init_parser classmethod via entry point
cls = Provider._load_ep_provider(provider)
if cls and hasattr(cls, "init_parser"):
try:
cls.init_parser(self)
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def validate_provider_arguments(arguments: Namespace) -> tuple[bool, str]:
+29
View File
@@ -0,0 +1,29 @@
"""Leaf helper for built-in provider detection.
Lives in its own module with no imports back into `prowler.lib.check` so
that callers in `prowler.lib.check.*` can ask "is this provider built-in?"
without creating an import cycle through `prowler.providers.common.provider`
(which transitively imports `prowler.config.config` and from there
`prowler.lib.check.compliance_models` / `prowler.lib.check.external_tool_providers`).
Same rationale as `prowler.lib.check.tool_wrapper`: extracting the predicate
to a leaf module is the canonical way to break the cycle in this codebase.
"""
import importlib.util
def is_builtin_provider(provider: str) -> bool:
"""Return True if the provider's own package ships with the SDK.
Wraps `importlib.util.find_spec` in `try/except (ImportError, ValueError)`
because `find_spec` propagates `ModuleNotFoundError` when a parent package
in the dotted path does not exist (instead of returning `None`). The
try/except is what makes the call safe for external providers, whose
package does not live under `prowler.providers.{provider}`.
"""
try:
spec = importlib.util.find_spec(f"prowler.providers.{provider}")
return spec is not None
except (ImportError, ValueError):
return False
+273 -28
View File
@@ -1,4 +1,6 @@
import importlib
import importlib.metadata
import importlib.util
import os
import pkgutil
import sys
@@ -136,6 +138,108 @@ class Provider(ABC):
"""
return set()
# --- Dynamic provider contract methods (not @abstractmethod for incremental migration) ---
_cli_help_text: str = ""
@classmethod
def from_cli_args(cls, arguments: Namespace, fixer_config: dict) -> "Provider":
"""Instantiate the provider from CLI arguments and return the instance.
The caller wires the returned instance into the global provider slot
via Provider.set_global_provider(). Implementations that already call
set_global_provider(self) from __init__ are also supported the call
site tolerates a None return in that case.
"""
raise NotImplementedError(f"{cls.__name__} has not implemented from_cli_args()")
def get_output_options(self, arguments, bulk_checks_metadata):
"""Create the provider-specific OutputOptions."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_output_options()"
)
def get_stdout_detail(self, finding) -> str:
"""Return the detail string for stdout reporting (region, location, etc.)."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_stdout_detail()"
)
def get_finding_sort_key(self) -> Optional[str]:
"""Return the attribute name to sort findings by, or None for no sorting."""
return None
def get_summary_entity(self) -> tuple:
"""Return (entity_type, audited_entities) for the summary table."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_summary_entity()"
)
def get_finding_output_data(self, _check_output) -> dict:
"""Return provider-specific fields for Finding.generate_output()."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_finding_output_data()"
)
def get_html_assessment_summary(self) -> str:
"""Return the HTML assessment summary card for this provider."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_html_assessment_summary()"
)
def generate_compliance_output(
self,
findings,
bulk_compliance_frameworks,
_input_compliance_frameworks,
output_options,
_generated_outputs,
) -> None:
"""Generate compliance CSV output for this provider's frameworks."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented generate_compliance_output()"
)
def get_mutelist_finding_args(self) -> dict:
"""Return extra kwargs for mutelist.is_finding_muted() besides 'finding'.
External providers must return a dict with the identity key their
Mutelist subclass expects, e.g. ``{"account_id": self.identity.account_id}``.
The ``finding`` kwarg is added automatically by the caller.
"""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_mutelist_finding_args()"
)
def display_compliance_table(
self,
findings: list,
bulk_checks_metadata: dict,
compliance_framework: str,
_output_filename: str,
_output_directory: str,
_compliance_overview: bool,
) -> bool:
"""Render a custom compliance table in the terminal.
External providers can override this to display a detailed
compliance table (e.g., per-section breakdown). Return True
if the table was rendered, False to fall back to the generic table.
"""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented display_compliance_table()"
)
# Class-level flag: True for providers that delegate scanning to an external
# tool (e.g. Trivy, promptfoo) and bypass standard check/service loading and
# metadata validation. Subclasses override as `is_external_tool_provider = True`.
# Kept as a class attribute (not a property) so it can be read from the class
# without instantiation — the metadata validators in lib.check.models need to
# decide whether to relax validation before any provider instance exists.
is_external_tool_provider: bool = False
# --- End dynamic provider contract methods ---
@staticmethod
def get_excluded_regions_from_env() -> set:
"""Parse the PROWLER_AWS_DISALLOWED_REGIONS environment variable.
@@ -159,20 +263,70 @@ class Provider(ABC):
@staticmethod
def init_global_provider(arguments: Namespace) -> None:
try:
provider_class_path = (
f"{providers_path}.{arguments.provider}.{arguments.provider}_provider"
)
provider_class_name = f"{arguments.provider.capitalize()}Provider"
provider_class = getattr(
import_module(provider_class_path), provider_class_name
# Discriminate built-in vs external upfront via find_spec, so an
# ImportError from a transitive dependency missing inside a
# built-in's own import chain surfaces clearly instead of being
# silently re-routed to the entry-point path.
provider_class = None
if Provider.is_builtin(arguments.provider):
# Built-in wins on provider-name collision. Plug-ins are
# first-class extenders (they can register new provider
# names) but cannot override existing built-ins — a security
# tool prefers fail-loud predictability over silent
# overrides. Surface the override so the user knows their
# plug-in is being ignored and can rename it.
if Provider._load_ep_provider(arguments.provider) is not None:
logger.warning(
f"Plug-in provider '{arguments.provider}' registered "
f"via entry points is being IGNORED — a built-in with "
f"the same name exists. To use your plug-in, register "
f"it under a different name."
)
provider_class_path = f"{providers_path}.{arguments.provider}.{arguments.provider}_provider"
provider_class_name = f"{arguments.provider.capitalize()}Provider"
try:
provider_class = getattr(
import_module(provider_class_path), provider_class_name
)
except ImportError as e:
logger.critical(
f"Failed to load built-in provider '{arguments.provider}'. "
f"Missing dependency: {e}. "
f"Ensure all required dependencies are installed."
)
logger.debug("Full traceback:", exc_info=True)
sys.exit(1)
except AttributeError:
# Module exists but doesn't define the expected class —
# treat as external and try entry points.
provider_class = Provider._load_ep_provider(arguments.provider)
else:
provider_class = Provider._load_ep_provider(arguments.provider)
if provider_class is None:
raise ImportError(
f"Provider '{arguments.provider}' not found as built-in or entry point"
)
# Kept for downstream forks that may extend the dispatch below
# with their own custom built-in branches and reference this name.
# The upstream chain dispatches by `arguments.provider` directly.
provider_class_name = (
f"{arguments.provider.capitalize()}Provider" # noqa: F841
)
fixer_config = load_and_validate_config_file(
arguments.provider, arguments.fixer_config
)
# Dispatch by exact provider name (equality, not substring) so
# external plug-ins whose names contain a built-in substring
# (e.g. `awsx`, `azure_gov`, `iac_v2`) cannot be silently routed
# to the wrong built-in branch. Anything that doesn't match a
# built-in falls through to the dynamic else and uses the
# contract's `from_cli_args`.
if not isinstance(Provider._global, provider_class):
if "aws" in provider_class_name.lower():
if arguments.provider == "aws":
excluded_regions = (
set(arguments.excluded_region)
if getattr(arguments, "excluded_region", None)
@@ -196,7 +350,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "azure" in provider_class_name.lower():
elif arguments.provider == "azure":
provider_class(
az_cli_auth=arguments.az_cli_auth,
sp_env_auth=arguments.sp_env_auth,
@@ -209,7 +363,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "gcp" in provider_class_name.lower():
elif arguments.provider == "gcp":
provider_class(
retries_max_attempts=arguments.gcp_retries_max_attempts,
organization_id=arguments.organization_id,
@@ -223,7 +377,7 @@ class Provider(ABC):
fixer_config=fixer_config,
skip_api_check=arguments.skip_api_check,
)
elif "kubernetes" in provider_class_name.lower():
elif arguments.provider == "kubernetes":
provider_class(
kubeconfig_file=arguments.kubeconfig_file,
context=arguments.context,
@@ -233,7 +387,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "m365" in provider_class_name.lower():
elif arguments.provider == "m365":
provider_class(
region=arguments.region,
config_path=arguments.config_file,
@@ -247,7 +401,7 @@ class Provider(ABC):
init_modules=arguments.init_modules,
fixer_config=fixer_config,
)
elif "nhn" in provider_class_name.lower():
elif arguments.provider == "nhn":
provider_class(
username=arguments.nhn_username,
password=arguments.nhn_password,
@@ -256,7 +410,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "github" in provider_class_name.lower():
elif arguments.provider == "github":
orgs = []
repos = []
@@ -288,13 +442,13 @@ class Provider(ABC):
exclude_workflows=getattr(arguments, "exclude_workflows", []),
fixer_config=fixer_config,
)
elif "googleworkspace" in provider_class_name.lower():
elif arguments.provider == "googleworkspace":
provider_class(
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "cloudflare" in provider_class_name.lower():
elif arguments.provider == "cloudflare":
provider_class(
filter_zones=arguments.region,
filter_accounts=arguments.account_id,
@@ -302,7 +456,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "iac" in provider_class_name.lower():
elif arguments.provider == "iac":
provider_class(
scan_path=arguments.scan_path,
scan_repository_url=arguments.scan_repository_url,
@@ -315,13 +469,13 @@ class Provider(ABC):
oauth_app_token=arguments.oauth_app_token,
provider_uid=arguments.provider_uid,
)
elif "llm" in provider_class_name.lower():
elif arguments.provider == "llm":
provider_class(
max_concurrency=arguments.max_concurrency,
config_path=arguments.config_file,
fixer_config=fixer_config,
)
elif "image" in provider_class_name.lower():
elif arguments.provider == "image":
provider_class(
images=arguments.images,
image_list_file=arguments.image_list_file,
@@ -339,7 +493,7 @@ class Provider(ABC):
registry_insecure=arguments.registry_insecure,
registry_list_images=arguments.registry_list_images,
)
elif "mongodbatlas" in provider_class_name.lower():
elif arguments.provider == "mongodbatlas":
provider_class(
atlas_public_key=arguments.atlas_public_key,
atlas_private_key=arguments.atlas_private_key,
@@ -348,7 +502,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "oraclecloud" in provider_class_name.lower():
elif arguments.provider == "oraclecloud":
provider_class(
oci_config_file=arguments.oci_config_file,
profile=arguments.profile,
@@ -359,7 +513,7 @@ class Provider(ABC):
fixer_config=fixer_config,
use_instance_principal=arguments.use_instance_principal,
)
elif "openstack" in provider_class_name.lower():
elif arguments.provider == "openstack":
provider_class(
clouds_yaml_file=getattr(arguments, "clouds_yaml_file", None),
clouds_yaml_content=getattr(
@@ -384,7 +538,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "alibabacloud" in provider_class_name.lower():
elif arguments.provider == "alibabacloud":
provider_class(
role_arn=arguments.role_arn,
role_session_name=arguments.role_session_name,
@@ -396,13 +550,25 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "vercel" in provider_class_name.lower():
elif arguments.provider == "vercel":
provider_class(
projects=getattr(arguments, "project", None),
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
else:
# Dynamic fallback: any external/custom provider.
# Honor the from_cli_args type hint (-> Provider): if the
# implementation returns an instance, wire it as the global
# provider here. Implementations that call
# set_global_provider(self) from __init__ return None and
# remain supported (the condition below is a no-op for them).
provider_instance = provider_class.from_cli_args(
arguments, fixer_config
)
if provider_instance is not None:
Provider.set_global_provider(provider_instance)
except TypeError as error:
logger.critical(
@@ -415,17 +581,96 @@ class Provider(ABC):
)
sys.exit(1)
# Cache for entry-point provider classes {name: class}
_ep_providers: dict = {}
@staticmethod
def get_available_providers() -> list[str]:
"""get_available_providers returns a list of the available providers"""
providers = []
# Dynamically import the package based on its string path
providers = set()
# Built-in providers from local package
prowler_providers = importlib.import_module(providers_path)
# Iterate over all modules found in the prowler_providers package
for _, provider, ispkg in pkgutil.iter_modules(prowler_providers.__path__):
if provider != "common" and ispkg:
providers.append(provider)
return providers
providers.add(provider)
# External providers registered via entry points
for ep in importlib.metadata.entry_points(group="prowler.providers"):
providers.add(ep.name)
return sorted(providers)
@staticmethod
def is_tool_wrapper_provider(provider: str) -> bool:
"""Return True if the provider delegates scanning to an external tool.
Delegates to `prowler.lib.check.tool_wrapper.is_tool_wrapper_provider`,
the leaf module that holds the actual logic. Kept on `Provider` as a
convenience entry point for callers that already import `Provider`.
"""
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider as _impl
return _impl(provider)
@staticmethod
def is_builtin(provider: str) -> bool:
"""Return True if the provider's own package is importable as a built-in.
Delegates to `prowler.providers.common.builtin.is_builtin_provider`,
the leaf module that holds the actual check. Kept on `Provider` as a
convenience entry point for callers that already import `Provider`.
Call sites in `prowler.lib.check.*` should import from the leaf
directly to avoid the import cycle through this module.
"""
from prowler.providers.common.builtin import is_builtin_provider as _impl
return _impl(provider)
@staticmethod
def _load_ep_provider(name: str):
"""Load an external provider class from entry points, with cache."""
if name in Provider._ep_providers:
return Provider._ep_providers[name]
for ep in importlib.metadata.entry_points(group="prowler.providers"):
if ep.name == name:
try:
cls = ep.load()
Provider._ep_providers[name] = cls
return cls
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
@staticmethod
def get_providers_help_text() -> dict:
"""Returns a dict of {provider_name: cli_help_text} for all available providers."""
help_text = {}
for name in Provider.get_available_providers():
try:
# Try built-in first
module_path = f"{providers_path}.{name}.{name}_provider"
module = import_module(module_path)
cls = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, Provider)
and attr is not Provider
):
cls = attr
break
help_text[name] = getattr(cls, "_cli_help_text", "") if cls else ""
except ImportError:
# External provider — load via entry point
cls = Provider._load_ep_provider(name)
help_text[name] = getattr(cls, "_cli_help_text", "") if cls else ""
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
help_text[name] = ""
return help_text
@staticmethod
def update_provider_config(audit_config: dict, variable: str, value: str):
+15 -5
View File
@@ -329,12 +329,21 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
@staticmethod
def _strip_scheme(value: str) -> str:
"""Remove a leading http:// or https:// scheme from a registry input."""
for prefix in ("https://", "http://"):
if value.lower().startswith(prefix):
return value[len(prefix) :]
return value
@staticmethod
def _extract_registry(image: str) -> str | None:
"""Extract registry hostname from an image reference.
Returns None for Docker Hub images (no registry prefix).
"""
image = ImageProvider._strip_scheme(image)
parts = image.split("/")
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
return parts[0]
@@ -348,6 +357,7 @@ class ImageProvider(Provider):
or "myregistry.com:5000" are registry URLs (dots in host, no slash).
Image references like "alpine:3.18" or "nginx" are not.
"""
image_uid = ImageProvider._strip_scheme(image_uid)
if "/" not in image_uid:
host_part = image_uid.split(":")[0]
if "." in host_part:
@@ -835,11 +845,9 @@ class ImageProvider(Provider):
image_ref = f"{repo}:{tag}"
else:
# OCI registries need the full host/repo:tag reference
registry_host = self.registry.rstrip("/")
for prefix in ("https://", "http://"):
if registry_host.startswith(prefix):
registry_host = registry_host[len(prefix) :]
break
registry_host = ImageProvider._strip_scheme(
self.registry.rstrip("/")
)
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
@@ -977,6 +985,8 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
image = ImageProvider._strip_scheme(image)
# Registry URL (bare hostname) → test via OCI catalog
if ImageProvider._is_registry_url(image):
return ImageProvider._test_registry_connection(
@@ -1,36 +1,37 @@
def is_rule_allowing_permissions(rules, resources, verbs):
def is_rule_allowing_permissions(rules, resources, verbs, api_groups=("",)):
"""
Check Kubernetes role permissions.
Check whether any RBAC rule grants the specified verbs on the specified
resources within the specified API groups.
This function takes in Kubernetes role rules, resources, and verbs,
and checks if any of the rules grant permissions on the specified
resources with the specified verbs.
A rule matches when its `apiGroups` includes any of `api_groups` (or "*"),
its `resources` includes any of `resources` (or "*"), and its `verbs`
includes any of `verbs` (or "*").
Args:
rules (List[Rule]): The list of Kubernetes role rules.
resources (List[str]): The list of resources to check permissions for.
verbs (List[str]): The list of verbs to check permissions for.
rules (List[Rule]): RBAC rules from a Role or ClusterRole.
resources (List[str]): Resources (or sub-resources) to check.
verbs (List[str]): Verbs to check.
api_groups (Iterable[str]): API groups the resources live in. Defaults
to ("",), the core API group, which matches the most common case.
Pass an explicit value for resources outside the core group, e.g.
("admissionregistration.k8s.io",) for webhook configurations.
Returns:
bool: True if any of the rules grant permissions, False otherwise.
bool: True if any rule grants the permission, False otherwise.
"""
if rules:
# Iterate through each rule in the list of rules
for rule in rules:
# Ensure apiGroups are relevant ("" or "v1" for secrets)
if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups):
continue # Skip rules with unrelated apiGroups
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
if (
rule.resources
and (
any(resource in rule.resources for resource in resources)
or "*" in rule.resources
)
and rule.verbs
and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs)
):
# If the rule matches, return True
return True
# If no rule matches, return False
if not rules:
return False
for rule in rules:
rule_api_groups = rule.apiGroups or [""]
if not (
any(g in rule_api_groups for g in api_groups) or "*" in rule_api_groups
):
continue
if (
rule.resources
and (any(r in rule.resources for r in resources) or "*" in rule.resources)
and rule.verbs
and (any(v in rule.verbs for v in verbs) or "*" in rule.verbs)
):
return True
return False
@@ -6,29 +6,40 @@ from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["update", "patch"]
resources = ["certificatesigningrequests/approval"]
api_groups = ["certificates.k8s.io"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings
@@ -11,21 +11,32 @@ resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings
@@ -9,29 +9,40 @@ resources = [
"mutatingwebhookconfigurations",
]
verbs = ["create", "update", "delete"]
api_groups = ["admissionregistration.k8s.io"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings
+27
View File
@@ -0,0 +1,27 @@
from typing import Optional
def extract_billing_plan(data: Optional[dict]) -> Optional[str]:
"""Return the Vercel billing plan from a user or team payload.
Vercel's REST API consistently returns the plan identifier at
``data["billing"]["plan"]`` (e.g. ``"hobby"``, ``"pro"``, ``"enterprise"``)
on both ``GET /v2/user`` and ``GET /v2/teams`` responses, even though the
field is not part of the public OpenAPI schema.
"""
if not isinstance(data, dict):
return None
billing = data.get("billing")
if not isinstance(billing, dict):
return None
plan = billing.get("plan")
return plan.lower() if isinstance(plan, str) else None
def plan_reason_suffix(
billing_plan: Optional[str], unsupported_plans: set[str], explanation: str
) -> str:
"""Return a plan-based explanation suffix only when the plan proves it."""
if billing_plan in unsupported_plans:
return f" This may be expected because {explanation}"
return ""
@@ -84,10 +84,10 @@ class VercelService:
)
if response.status_code == 403:
# Plan limitation or permission error — return None for graceful handling
logger.warning(
# Endpoint unavailable for this token/scope; let checks handle it gracefully
logger.info(
f"{self.service} - Access denied for {path} (403). "
"This may be a plan limitation."
"This may be caused by plan or permission restrictions."
)
return None
+19
View File
@@ -21,6 +21,7 @@ class VercelTeamInfo(BaseModel):
id: str
name: str
slug: str
billing_plan: Optional[str] = None
class VercelIdentityInfo(BaseModel):
@@ -29,9 +30,27 @@ class VercelIdentityInfo(BaseModel):
user_id: Optional[str] = None
username: Optional[str] = None
email: Optional[str] = None
billing_plan: Optional[str] = None
team: Optional[VercelTeamInfo] = None
teams: list[VercelTeamInfo] = Field(default_factory=list)
def get_billing_plan_for(self, scope_id: Optional[str]) -> Optional[str]:
"""Return the billing plan for an explicit user or team scope."""
if not scope_id:
return None
if self.team and self.team.id == scope_id and self.team.billing_plan:
return self.team.billing_plan
for team in self.teams:
if team.id == scope_id:
return team.billing_plan
if self.user_id == scope_id:
return self.billing_plan
return None
class VercelOutputOptions(ProviderOutputOptions):
"""Customize output filenames for Vercel scans."""
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"encryption"
"encryption",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"secrets"
"secrets",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"secrets"
"secrets",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"secrets"
"secrets",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"project_deployment_protection_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Enterprise, or as a paid add-on for Pro plans."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.project.project_client import project_client
@@ -38,6 +39,7 @@ class project_password_protection_enabled(Check):
report.status_extended = (
f"Project {project.name} does not have password protection "
f"configured for deployments."
f"{plan_reason_suffix(project.billing_plan, {'hobby'}, 'password protection is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"project_deployment_protection_enabled"
],
"Notes": ""
"Notes": "Protecting production deployments requires Enterprise, or Pro plans with supported paid deployment protection options."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.project.project_client import project_client
@@ -38,6 +39,7 @@ class project_production_deployment_protection_enabled(Check):
report.status_extended = (
f"Project {project.name} does not have deployment protection "
f"enabled on production deployments."
f"{plan_reason_suffix(project.billing_plan, {'hobby'}, 'protecting production deployments is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -20,6 +20,7 @@ class Project(VercelService):
"""List all projects, optionally filtered by --project argument."""
try:
raw_projects = self._paginate("/v9/projects", "projects")
identity = getattr(self.provider, "identity", None)
filter_projects = self.provider.filter_projects
seen_ids: set[str] = set()
@@ -57,10 +58,17 @@ class Project(VercelService):
pwd_protection = proj.get("passwordProtection")
security = proj.get("security", {}) or {}
project_team_id = proj.get("accountId") or self.provider.session.team_id
self.projects[project_id] = VercelProject(
id=project_id,
name=project_name,
team_id=proj.get("accountId") or self.provider.session.team_id,
team_id=project_team_id,
billing_plan=(
identity.get_billing_plan_for(project_team_id)
if identity
else None
),
framework=proj.get("framework"),
node_version=proj.get("nodeVersion"),
auto_expose_system_envs=proj.get("autoExposeSystemEnvs", False),
@@ -160,6 +168,7 @@ class VercelProject(BaseModel):
id: str
name: str
team_id: Optional[str] = None
billing_plan: Optional[str] = None
framework: Optional[str] = None
node_version: Optional[str] = None
auto_expose_system_envs: bool = False
@@ -28,9 +28,10 @@
}
},
"Categories": [
"resilience"
"resilience",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.project.project_client import project_client
@@ -34,6 +35,7 @@ class project_skew_protection_enabled(Check):
report.status_extended = (
f"Project {project.name} does not have skew protection enabled, "
f"which may cause version mismatches during deployments."
f"{plan_reason_suffix(project.billing_plan, {'hobby'}, 'skew protection is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -24,7 +25,16 @@ class security_custom_rules_configured(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.custom_rules:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for custom firewall rules because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'custom firewall rules are not available on the Vercel Hobby plan.')}"
)
elif config.custom_rules:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -25,7 +26,16 @@ class security_ip_blocking_rules_configured(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.ip_blocking_rules:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for IP blocking rules because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'IP blocking rules are not available on the Vercel Hobby plan.')}"
)
elif config.ip_blocking_rules:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
@@ -9,7 +9,7 @@
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "security",
"Description": "**Vercel projects** are assessed for **managed WAF ruleset** enablement. Managed rulesets are curated by Vercel and provide protection against known attack patterns including **OWASP Top 10** threats. This feature requires an Enterprise plan and reports MANUAL status when unavailable.",
"Description": "**Vercel projects** are assessed for **managed WAF ruleset** enablement. Managed rulesets are curated by Vercel and provide protection against known attack patterns including **OWASP Top 10** threats. Availability varies by ruleset, and the check reports MANUAL when the firewall configuration cannot be assessed from the API.",
"Risk": "Without **managed rulesets** enabled, the firewall lacks curated protection rules against well-known attack patterns. The application relies solely on custom rules, which may miss **new or evolving threats** that managed rulesets are designed to detect and block automatically.",
"RelatedUrl": "",
"AdditionalURLs": [
@@ -19,20 +19,21 @@
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Vercel dashboard\n2. Navigate to the project Settings > Security > Firewall\n3. Enable managed rulesets from the available options\n4. Review and configure ruleset sensitivity levels\n5. Note: This feature requires an Enterprise plan",
"Other": "1. Sign in to the Vercel dashboard\n2. Navigate to the project Settings > Security > Firewall\n3. Enable the managed rulesets that are available for your plan\n4. Review and configure ruleset sensitivity levels\n5. If the API does not expose firewall configuration for the project, verify the rulesets manually in the dashboard",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable managed WAF rulesets to benefit from Vercel-curated protection against common attack patterns. If you are on a plan that does not support managed rulesets, consider upgrading to the Enterprise plan for enhanced security features.",
"Text": "Enable the managed WAF rulesets that are available for your Vercel plan to benefit from curated protection against common attack patterns. If the API does not expose firewall configuration for the project, verify the rulesets manually in the dashboard.",
"Url": "https://hub.prowler.com/check/security_managed_rulesets_enabled"
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": "This check is plan-gated. If the Vercel API returns a 403 for managed rulesets, the check reports MANUAL status indicating that an Enterprise plan is required."
"Notes": "Managed ruleset availability varies by ruleset. OWASP Core Ruleset requires Enterprise, while Bot Protection and AI Bots managed rulesets are available on all plans."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -17,8 +18,8 @@ class security_managed_rulesets_enabled(Check):
"""Execute the Vercel Managed Rulesets Enabled check.
Iterates over all firewall configurations and checks if managed
rulesets are enabled. Reports MANUAL status when the feature is
not available due to plan limitations.
rulesets are enabled. Reports MANUAL status when the firewall
configuration cannot be assessed from the API.
Returns:
List[CheckReportVercel]: A list of reports for each project.
@@ -27,12 +28,14 @@ class security_managed_rulesets_enabled(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.managed_rulesets is None:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for managed rulesets. "
f"Enterprise plan required to access this feature."
f"could not be assessed for managed rulesets because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby', 'pro'}, 'some managed WAF rulesets, including the OWASP Core Ruleset, are only available on Vercel Enterprise plans.')}"
)
elif config.managed_rulesets:
report.status = "PASS"
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -24,7 +25,16 @@ class security_rate_limiting_configured(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.rate_limiting_rules:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for rate limiting rules because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'rate limiting rules are not available on the Vercel Hobby plan.')}"
)
elif config.rate_limiting_rules:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
@@ -29,11 +29,13 @@ class Security(VercelService):
data = self._read_firewall_config(project)
if data is None:
# 403 — plan limitation, store with managed_rulesets=None
# Firewall config endpoint unavailable for this project/token
self.firewall_configs[project.id] = VercelFirewallConfig(
project_id=project.id,
project_name=project.name,
team_id=project.team_id,
billing_plan=project.billing_plan,
firewall_config_accessible=False,
firewall_enabled=False,
managed_rulesets=None,
name=project.name,
@@ -49,6 +51,8 @@ class Security(VercelService):
project_id=project.id,
project_name=project.name,
team_id=project.team_id,
billing_plan=project.billing_plan,
firewall_config_accessible=True,
firewall_enabled=(
fallback_firewall_enabled
if fallback_firewall_enabled is not None
@@ -93,6 +97,8 @@ class Security(VercelService):
project_id=project.id,
project_name=project.name,
team_id=project.team_id,
billing_plan=project.billing_plan,
firewall_config_accessible=True,
firewall_enabled=firewall_enabled,
managed_rulesets=managed,
custom_rules=custom_rules,
@@ -246,8 +252,10 @@ class VercelFirewallConfig(BaseModel):
project_id: str
project_name: Optional[str] = None
team_id: Optional[str] = None
billing_plan: Optional[str] = None
firewall_config_accessible: bool = True
firewall_enabled: bool = False
managed_rulesets: Optional[dict] = None # None means plan-gated (403)
managed_rulesets: Optional[dict] = None # None means config endpoint unavailable
custom_rules: list[dict] = Field(default_factory=list)
ip_blocking_rules: list[dict] = Field(default_factory=list)
rate_limiting_rules: list[dict] = Field(default_factory=list)
@@ -28,12 +28,13 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_managed_rulesets_enabled",
"security_custom_rules_configured"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -24,13 +25,15 @@ class security_waf_enabled(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.managed_rulesets is None:
# 403 — plan limitation, cannot determine WAF status
if not config.firewall_config_accessible:
# Firewall config could not be retrieved for this project
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be checked for WAF status due to plan limitations. "
f"could not be checked for WAF status because the firewall "
f"configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'the Web Application Firewall is not available on the Vercel Hobby plan.')}"
)
elif config.firewall_enabled:
report.status = "PASS"
@@ -29,11 +29,12 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-enterprise-plan"
],
"DependsOn": [],
"RelatedTo": [
"team_saml_sso_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.team.team_client import team_client
@@ -40,6 +41,7 @@ class team_directory_sync_enabled(Check):
report.status_extended = (
f"Team {team.name} does not have directory sync (SCIM) enabled. "
f"User provisioning and deprovisioning must be managed manually."
f"{plan_reason_suffix(team.billing_plan, {'hobby', 'pro'}, 'directory sync (SCIM) is only available on Vercel Enterprise plans.')}"
)
findings.append(report)
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -29,11 +29,12 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"team_saml_sso_enforced"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.team.team_client import team_client
@@ -38,6 +39,7 @@ class team_saml_sso_enabled(Check):
report.status = "FAIL"
report.status_extended = (
f"Team {team.name} does not have SAML SSO enabled."
f"{plan_reason_suffix(team.billing_plan, {'hobby'}, 'SAML SSO is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -29,11 +29,12 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"team_saml_sso_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.team.team_client import team_client
@@ -43,6 +44,7 @@ class team_saml_sso_enforced(Check):
else:
report.status_extended = (
f"Team {team.name} does not have SAML SSO enforced."
f"{plan_reason_suffix(team.billing_plan, {'hobby'}, 'SAML SSO is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -4,6 +4,7 @@ from typing import Optional
from pydantic import BaseModel, Field
from prowler.lib.logger import logger
from prowler.providers.vercel.lib.billing import extract_billing_plan
from prowler.providers.vercel.lib.service.service import VercelService
@@ -67,6 +68,7 @@ class Team(VercelService):
id=team_data.get("id", team_id),
name=team_data.get("name", ""),
slug=team_data.get("slug", ""),
billing_plan=extract_billing_plan(team_data),
saml=saml_config,
directory_sync_enabled=dir_sync,
created_at=created_at,
@@ -151,6 +153,7 @@ class VercelTeam(BaseModel):
id: str
name: str
slug: str
billing_plan: Optional[str] = None
saml: Optional[SAMLConfig] = None
directory_sync_enabled: bool = False
members: list[VercelTeamMember] = Field(default_factory=list)
@@ -20,6 +20,7 @@ from prowler.providers.vercel.exceptions.exceptions import (
VercelRateLimitError,
VercelSessionError,
)
from prowler.providers.vercel.lib.billing import extract_billing_plan
from prowler.providers.vercel.lib.mutelist.mutelist import VercelMutelist
from prowler.providers.vercel.models import (
VercelIdentityInfo,
@@ -195,6 +196,7 @@ class VercelProvider(Provider):
user_id = user_data.get("id")
username = user_data.get("username")
email = user_data.get("email")
billing_plan = extract_billing_plan(user_data)
# Get team info
team_info = None
@@ -214,6 +216,7 @@ class VercelProvider(Provider):
id=team_data.get("id", session.team_id),
name=team_data.get("name", ""),
slug=team_data.get("slug", ""),
billing_plan=extract_billing_plan(team_data),
)
all_teams = [team_info]
elif team_response.status_code in (404, 403):
@@ -239,6 +242,7 @@ class VercelProvider(Provider):
id=t.get("id", ""),
name=t.get("name", ""),
slug=t.get("slug", ""),
billing_plan=extract_billing_plan(t),
)
)
if all_teams:
@@ -253,6 +257,7 @@ class VercelProvider(Provider):
user_id=user_id,
username=username,
email=email,
billing_plan=billing_plan,
team=team_info,
teams=all_teams,
)
+27 -1
View File
@@ -17,7 +17,7 @@ MOCK_OLD_PROWLER_VERSION = "0.0.0"
MOCK_PROWLER_MASTER_VERSION = "3.4.0"
def mock_prowler_get_latest_release(_, **kwargs):
def mock_prowler_get_latest_release(_, **_kwargs):
"""Mock requests.get() to get the Prowler latest release"""
response = Response()
response._content = b'[{"name":"3.3.0"}]'
@@ -500,6 +500,32 @@ class Test_Config:
assert load_and_validate_config_file("azure", config_test_file) == {}
assert load_and_validate_config_file("kubernetes", config_test_file) == {}
def test_load_and_validate_config_file_namespaced_non_listed_provider(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config_namespaced_external.yaml"
# github is a built-in not in the legacy hardcoded list; namespaced format must unwrap it.
assert load_and_validate_config_file("github", config_test_file) == {
"token": "abc",
"org": "prowler-cloud",
}
def test_load_and_validate_config_file_namespaced_external_provider(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config_namespaced_external.yaml"
# External plug-in provider: namespaced format must unwrap its block.
assert load_and_validate_config_file("custom_plugin", config_test_file) == {
"setting": "value",
"nested": {"key": 42},
}
def test_load_and_validate_config_file_namespaced_missing_provider(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config_namespaced_external.yaml"
# Provider with no section in a namespaced file must return empty config,
# not the full file (prevents cross-provider config leakage).
assert load_and_validate_config_file("aws", config_test_file) == {}
assert load_and_validate_config_file("gcp", config_test_file) == {}
def test_load_and_validate_config_file_invalid_config_file_path(self, caplog):
provider = "aws"
config_file_path = "invalid/path/to/fixer_config.yaml"
@@ -0,0 +1,8 @@
# Namespaced config covering a non-listed built-in (github) and an external plugin.
github:
token: abc
org: prowler-cloud
custom_plugin:
setting: value
nested:
key: 42
+77 -1
View File
@@ -95,6 +95,38 @@ class TestCheckMetada:
"/path/to/accessanalyzer_enabled/accessanalyzer_enabled.metadata.json"
)
@mock.patch("prowler.lib.check.models.logger")
@mock.patch("prowler.lib.check.models.load_check_metadata")
@mock.patch("prowler.lib.check.models.recover_checks_from_provider")
def test_get_bulk_builtin_wins_on_check_id_collision(
self, mock_recover_checks, mock_load_metadata, mock_logger
):
"""Regression guard: when an entry-point plug-in re-registers a
built-in CheckID, the BUILT-IN metadata wins (first-write-wins) and
the plug-in is IGNORED. The override is surfaced via a warning so
the user knows their plug-in duplicate is being skipped and can
rename it. Matches the precedence in `_resolve_check_module`. See
PR #10700 review (HugoPBrito)."""
# Built-in first, plug-in last (matches recover_checks_from_provider order)
mock_recover_checks.return_value = [
("accessanalyzer_enabled", "/builtin/accessanalyzer_enabled"),
("accessanalyzer_enabled", "/plugin/accessanalyzer_enabled"),
]
builtin_metadata = mock.MagicMock(CheckID="accessanalyzer_enabled")
plugin_metadata = mock.MagicMock(CheckID="accessanalyzer_enabled")
mock_load_metadata.side_effect = [builtin_metadata, plugin_metadata]
result = CheckMetadata.get_bulk(provider="aws")
# Built-in wins (first-write-wins on CheckID), plug-in is ignored
assert result["accessanalyzer_enabled"] is builtin_metadata
# Override is surfaced via warning naming the plug-in metadata file
mock_logger.warning.assert_called_once()
warning_msg = mock_logger.warning.call_args.args[0]
assert "accessanalyzer_enabled" in warning_msg
assert "/plugin/accessanalyzer_enabled" in warning_msg
@mock.patch("prowler.lib.check.models.load_check_metadata")
@mock.patch("prowler.lib.check.models.recover_checks_from_provider")
def test_list(self, mock_recover_checks, mock_load_metadata):
@@ -377,6 +409,50 @@ class TestCheckMetadataValidators:
check_metadata = CheckMetadata(**valid_metadata)
assert check_metadata.Categories == ["encryption", "logging", "secrets"]
def test_valid_vercel_plan_categories_success(self):
"""Test Vercel plan categories are accepted using hyphen-separated names."""
valid_metadata = {
"Provider": "vercel",
"CheckID": "test_check",
"CheckTitle": "Test Check",
"CheckType": [],
"ServiceName": "test",
"SubServiceName": "subtest",
"ResourceIdTemplate": "template",
"Severity": "high",
"ResourceType": "TestResource",
"Description": "Test description",
"Risk": "Test risk",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "test command",
"NativeIaC": "test native",
"Other": "test other",
"Terraform": "test terraform",
},
"Recommendation": {
"Text": "test recommendation",
"Url": "https://hub.prowler.com/check/test_check",
},
},
"Categories": [
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Test notes",
}
check_metadata = CheckMetadata(**valid_metadata)
assert check_metadata.Categories == [
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
]
def test_valid_category_failure_non_string(self):
"""Test valid category validation fails with non-string category"""
invalid_metadata = {
@@ -454,7 +530,7 @@ class TestCheckMetadataValidators:
with pytest.raises(ValidationError) as exc_info:
CheckMetadata(**invalid_metadata)
assert (
"Categories can only contain lowercase letters, numbers and hyphen"
"Categories can only contain lowercase letters, numbers, and hyphen '-'"
in str(exc_info.value)
)
+110
View File
@@ -0,0 +1,110 @@
"""Unit tests for prowler.lib.check.tool_wrapper.
Covers the leaf helper directly (Provider.is_tool_wrapper_provider delegates
to it). Tests the frozenset fast path, the entry-point fallback for external
plug-ins, the broken-plug-in path, the no-match path, and the module-level
cache.
"""
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def _clear_ep_class_cache():
"""Reset the leaf module's cache between tests so they stay independent."""
from prowler.lib.check import tool_wrapper
tool_wrapper._ep_class_cache.clear()
yield
tool_wrapper._ep_class_cache.clear()
def _make_entry_point(name, cls):
"""Create a mock entry point whose `load()` returns `cls`."""
ep = MagicMock()
ep.name = name
ep.load.return_value = cls
return ep
class TestIsToolWrapperProvider:
"""is_tool_wrapper_provider: frozenset + entry-point fallback."""
@pytest.mark.parametrize("name", ["iac", "llm", "image"])
def test_returns_true_for_builtin_tool_wrappers(self, name):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
assert is_tool_wrapper_provider(name) is True
@pytest.mark.parametrize("name", ["aws", "azure", "gcp", "github", "kubernetes"])
def test_returns_false_for_regular_builtins(self, name):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
assert is_tool_wrapper_provider(name) is False
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_true_for_external_plugin_with_flag(self, mock_eps):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
cls = MagicMock(is_external_tool_provider=True)
mock_eps.return_value = [_make_entry_point("custom_wrapper", cls)]
assert is_tool_wrapper_provider("custom_wrapper") is True
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_false_for_external_plugin_without_flag(self, mock_eps):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
cls = MagicMock(is_external_tool_provider=False)
mock_eps.return_value = [_make_entry_point("vanilla_external", cls)]
assert is_tool_wrapper_provider("vanilla_external") is False
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_false_for_unknown_provider(self, mock_eps):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
mock_eps.return_value = []
assert is_tool_wrapper_provider("does-not-exist") is False
class TestLoadEpClass:
"""_load_ep_class: cache, broken plug-ins, no-match."""
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_caches_result_across_calls(self, mock_eps):
from prowler.lib.check.tool_wrapper import _load_ep_class
cls = MagicMock(is_external_tool_provider=True)
mock_eps.return_value = [_make_entry_point("cached_one", cls)]
first = _load_ep_class("cached_one")
second = _load_ep_class("cached_one")
assert first is cls
assert second is cls
# entry_points consulted only on the first call
assert mock_eps.call_count == 1
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_none_for_broken_plugin(self, mock_eps):
from prowler.lib.check.tool_wrapper import _load_ep_class
broken_ep = MagicMock()
broken_ep.name = "broken"
broken_ep.load.side_effect = ImportError("plug-in is broken")
mock_eps.return_value = [broken_ep]
assert _load_ep_class("broken") is None
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_none_when_no_entry_point_matches(self, mock_eps):
from prowler.lib.check.tool_wrapper import _load_ep_class
cls = MagicMock()
mock_eps.return_value = [_make_entry_point("other_provider", cls)]
assert _load_ep_class("missing_provider") is None
+4 -4
View File
@@ -51,7 +51,7 @@ def mock_provider():
def mock_execute():
with mock.patch("prowler.lib.scan.scan.execute", autospec=True) as mock_exec:
findings = [finding]
mock_exec.side_effect = lambda *args, **kwargs: findings
mock_exec.side_effect = lambda *_args, **_kwargs: findings
yield mock_exec
@@ -264,10 +264,10 @@ class TestScan:
@patch("prowler.lib.scan.scan.update_checks_metadata_with_compliance")
@patch("prowler.lib.scan.scan.Compliance.get_bulk")
@patch("prowler.lib.scan.scan.CheckMetadata.get_bulk")
@patch("prowler.lib.scan.scan.import_check")
@patch("prowler.lib.scan.scan._resolve_check_module")
def test_scan(
self,
mock_import_check,
mock_resolve_check_module,
mock_get_bulk,
mock_compliance_get_bulk,
mock_update_checks_metadata,
@@ -285,7 +285,7 @@ class TestScan:
mock_check_instance.CheckTitle = "Check if IAM Access Analyzer is enabled"
mock_check_instance.Categories = []
mock_import_check.return_value = MagicMock(
mock_resolve_check_module.return_value = MagicMock(
accessanalyzer_enabled=mock_check_class
)
@@ -100,7 +100,7 @@ class TestCloudTrailTimeline:
assert len(result) == 1
assert result[0]["event_name"] == "RunInstances"
assert result[0]["actor"] == "admin"
assert result[0]["actor"] == "user/admin"
assert result[0]["source_ip_address"] == "203.0.113.1"
def test_get_resource_timeline_with_resource_uid(
@@ -304,14 +304,28 @@ class TestExtractActor:
"arn": "arn:aws:iam::123456789012:user/alice",
"userName": "alice",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "alice"
assert CloudTrailTimeline._extract_actor(user_identity) == "user/alice"
def test_extract_actor_assumed_role(self):
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/MyRole/session-name",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "MyRole"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/MyRole/session-name"
)
def test_extract_actor_assumed_role_sso(self):
"""SSO sessions store the user identity in the session name."""
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com",
}
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com"
)
def test_extract_actor_root(self):
user_identity = {"type": "Root", "arn": "arn:aws:iam::123456789012:root"}
@@ -327,21 +341,33 @@ class TestExtractActor:
== "elasticloadbalancing.amazonaws.com"
)
def test_extract_actor_fallback_to_principal_id(self):
user_identity = {"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
assert (
CloudTrailTimeline._extract_actor(user_identity) == "AROAEXAMPLEID:session"
)
def test_extract_actor_unknown(self):
assert CloudTrailTimeline._extract_actor({}) == "Unknown"
def test_extract_actor_username_only_returns_unknown(self):
"""When userIdentity carries only userName/principalId (no arn or
invokedBy), we deliberately return "Unknown" we rely on the ARN
from the upstream service for the actor."""
assert (
CloudTrailTimeline._extract_actor({"type": "IAMUser", "userName": "alice"})
== "Unknown"
)
assert (
CloudTrailTimeline._extract_actor(
{"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
)
== "Unknown"
)
def test_extract_actor_federated_user(self):
user_identity = {
"type": "FederatedUser",
"arn": "arn:aws:sts::123456789012:federated-user/developer",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "developer"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "federated-user/developer"
)
class TestParseEvent:
@@ -380,7 +406,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["event_source"] == "ec2.amazonaws.com"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
assert result["actor_uid"] == "arn:aws:iam::123456789012:user/admin"
assert result["actor_type"] == "IAMUser"
@@ -424,7 +450,10 @@ class TestParseEvent:
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": {
"userIdentity": {"type": "IAMUser", "userName": "admin"},
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
},
},
}
timeline = CloudTrailTimeline(session=mock_session)
@@ -432,7 +461,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
def test_parse_event_missing_event_id(self, mock_session):
"""Test parsing event without EventId returns None (event_id is required)."""
@@ -506,7 +535,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
# actor_type should be None when not present in userIdentity
assert result["actor_type"] is None
@@ -0,0 +1,280 @@
from unittest import mock
import botocore
from botocore.exceptions import ClientError
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
PROMPT_ARN = (
f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id"
)
def mock_make_api_call_list_prompts_access_denied(self, operation_name, kwarg):
"""Mock API call where ListPrompts fails with AccessDeniedException."""
if operation_name == "ListPrompts":
raise ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "User is not authorized to perform: bedrock:ListPrompts",
}
},
operation_name,
)
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_with_prompts(self, operation_name, kwarg):
"""Mock API call that returns prompts."""
if operation_name == "ListPrompts":
return {
"promptSummaries": [
{
"id": "test-prompt-id",
"name": "test-prompt",
"arn": PROMPT_ARN,
}
]
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_with_multiple_prompts(self, operation_name, kwarg):
"""Mock API call that returns multiple prompts."""
if operation_name == "ListPrompts":
return {
"promptSummaries": [
{
"id": "test-prompt-id-1",
"name": "test-prompt-1",
"arn": f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id-1",
},
{
"id": "test-prompt-id-2",
"name": "test-prompt-2",
"arn": f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id-2",
},
{
"id": "test-prompt-id-3",
"name": "test-prompt-3",
"arn": f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id-3",
},
]
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_no_prompts(self, operation_name, kwarg):
"""Mock API call that returns no prompts."""
if operation_name == "ListPrompts":
return {"promptSummaries": []}
return make_api_call(self, operation_name, kwarg)
class Test_bedrock_prompt_management_exists:
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_prompts,
)
@mock_aws
def test_no_prompts(self):
"""Test FAIL when no prompts exist in the region."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No Bedrock Prompt Management prompts exist in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "prompt-management"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt-management"
)
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_with_prompts,
)
@mock_aws
def test_prompts_exist(self):
"""Test PASS when prompts exist in the region."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Bedrock Prompt Management prompt test-prompt exists in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "test-prompt-id"
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_arn == PROMPT_ARN
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_with_multiple_prompts,
)
@mock_aws
def test_multiple_prompts_exist(self):
"""Test PASS with one finding per prompt when multiple prompts exist."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 3
for index, finding in enumerate(result, start=1):
expected_name = f"test-prompt-{index}"
expected_id = f"test-prompt-id-{index}"
assert finding.status == "PASS"
assert (
finding.status_extended
== f"Bedrock Prompt Management prompt {expected_name} exists in region {AWS_REGION_US_EAST_1}."
)
assert finding.resource_id == expected_id
assert finding.region == AWS_REGION_US_EAST_1
assert (
finding.resource_arn
== f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/{expected_id}"
)
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_prompts,
)
@mock_aws
def test_no_prompts_multiple_regions(self):
"""Test FAIL in multiple regions when no prompts exist."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 2
for finding in result:
assert finding.status == "FAIL"
assert (
finding.status_extended
== f"No Bedrock Prompt Management prompts exist in region {finding.region}."
)
assert finding.resource_id == "prompt-management"
assert (
finding.resource_arn
== f"arn:aws:bedrock:{finding.region}:{AWS_ACCOUNT_NUMBER}:prompt-management"
)
regions = {finding.region for finding in result}
assert regions == {AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1}
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_list_prompts_access_denied,
)
@mock_aws
def test_list_prompts_client_error_skips_region(self):
"""Test that regions where ListPrompts fails produce no findings."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert result == []
@@ -341,3 +341,125 @@ class TestBedrockAgentPagination:
# Verify paginator was used
regional_client.get_paginator.assert_called_once_with("list_agents")
paginator.paginate.assert_called_once()
class TestBedrockPromptPagination:
"""Test suite for Bedrock Prompt pagination logic."""
def test_list_prompts_pagination(self):
"""Test that list_prompts iterates through all pages."""
# Mock the audit_info
audit_info = MagicMock()
audit_info.audited_partition = "aws"
audit_info.audited_account = "123456789012"
audit_info.audit_resources = None
# Mock the regional client
regional_client = MagicMock()
regional_client.region = "us-east-1"
# Mock paginator
paginator = MagicMock()
page1 = {
"promptSummaries": [
{
"id": "prompt-1",
"name": "prompt-name-1",
"arn": "arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-1",
}
]
}
page2 = {
"promptSummaries": [
{
"id": "prompt-2",
"name": "prompt-name-2",
"arn": "arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-2",
}
]
}
paginator.paginate.return_value = [page1, page2]
regional_client.get_paginator.return_value = paginator
# Initialize service and inject mock client
bedrock_agent_service = BedrockAgent(audit_info)
bedrock_agent_service.regional_clients = {"us-east-1": regional_client}
bedrock_agent_service.prompts = {} # Clear init side effects
bedrock_agent_service.prompt_scanned_regions = set()
# Run method
bedrock_agent_service._list_prompts(regional_client)
# Assertions
assert len(bedrock_agent_service.prompts) == 2
assert (
"arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-1"
in bedrock_agent_service.prompts
)
assert (
"arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-2"
in bedrock_agent_service.prompts
)
assert "us-east-1" in bedrock_agent_service.prompt_scanned_regions
# Verify paginator was used
regional_client.get_paginator.assert_called_once_with("list_prompts")
paginator.paginate.assert_called_once()
def test_list_prompts_ignores_audit_resources_filter(self):
"""Prompt collection is region-scoped and must ignore audit_resources."""
audit_info = MagicMock()
audit_info.audited_partition = "aws"
audit_info.audited_account = "123456789012"
audit_info.audit_resources = ["arn:aws:s3:::unrelated-resource"]
regional_client = MagicMock()
regional_client.region = "us-east-1"
paginator = MagicMock()
paginator.paginate.return_value = [
{
"promptSummaries": [
{
"id": "prompt-1",
"name": "prompt-name-1",
"arn": "arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-1",
}
]
}
]
regional_client.get_paginator.return_value = paginator
bedrock_agent_service = BedrockAgent(audit_info)
bedrock_agent_service.regional_clients = {"us-east-1": regional_client}
bedrock_agent_service.prompts = {}
bedrock_agent_service.prompt_scanned_regions = set()
bedrock_agent_service._list_prompts(regional_client)
assert len(bedrock_agent_service.prompts) == 1
assert "us-east-1" in bedrock_agent_service.prompt_scanned_regions
def test_list_prompts_error_does_not_mark_region_scanned(self):
"""If ListPrompts raises, the region must not be added to prompt_scanned_regions."""
audit_info = MagicMock()
audit_info.audited_partition = "aws"
audit_info.audited_account = "123456789012"
audit_info.audit_resources = None
regional_client = MagicMock()
regional_client.region = "us-east-1"
paginator = MagicMock()
paginator.paginate.side_effect = Exception("ListPrompts failed")
regional_client.get_paginator.return_value = paginator
bedrock_agent_service = BedrockAgent(audit_info)
bedrock_agent_service.regional_clients = {"us-east-1": regional_client}
bedrock_agent_service.prompts = {}
bedrock_agent_service.prompt_scanned_regions = set()
bedrock_agent_service._list_prompts(regional_client)
assert bedrock_agent_service.prompts == {}
assert bedrock_agent_service.prompt_scanned_regions == set()
View File
File diff suppressed because it is too large Load Diff
+68 -1
View File
@@ -7,6 +7,7 @@ from unittest.mock import MagicMock, patch
import pytest
from prowler.lib.check.models import CheckReportImage
from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageInvalidConfigScannerError,
ImageInvalidNameError,
@@ -20,7 +21,6 @@ from prowler.providers.image.exceptions.exceptions import (
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
from prowler.providers.common.provider import Provider
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
SAMPLE_IMAGE_SHA,
@@ -345,6 +345,24 @@ class TestImageProvider:
)
mock_adapter.list_repositories.assert_called_once()
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_url_with_https_scheme(self, mock_factory):
"""Registry URL with https:// scheme is normalised before adapter creation."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["repo1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="https://my-registry.example.com")
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="my-registry.example.com",
username=None,
password=None,
token=None,
)
mock_adapter.list_repositories.assert_called_once()
def test_build_status_extended(self):
"""Test status message content for different finding types."""
provider = _make_provider()
@@ -659,6 +677,27 @@ class TestImageProviderRegistryAuth:
assert "Docker login" in output
class TestStripScheme:
@pytest.mark.parametrize(
"raw,expected",
[
("https://my-registry.example.com", "my-registry.example.com"),
("http://my-registry.example.com", "my-registry.example.com"),
("HTTPS://My-Registry.Example.Com", "My-Registry.Example.Com"),
("Http://localhost:5000", "localhost:5000"),
("my-registry.example.com", "my-registry.example.com"),
("https://", ""),
("https://https://nested.example.com", "https://nested.example.com"),
(
"ftp://not-a-supported-scheme.example.com",
"ftp://not-a-supported-scheme.example.com",
),
],
)
def test_strip_scheme(self, raw, expected):
assert ImageProvider._strip_scheme(raw) == expected
class TestExtractRegistry:
def test_docker_hub_simple(self):
assert ImageProvider._extract_registry("alpine:3.18") is None
@@ -698,6 +737,24 @@ class TestExtractRegistry:
def test_bare_image_name(self):
assert ImageProvider._extract_registry("nginx") is None
def test_https_scheme_bare_hostname_returns_none(self):
"""Bare scheme-prefixed hostname has no image path, so no registry is extracted."""
assert (
ImageProvider._extract_registry("https://my-registry.example.com") is None
)
def test_http_scheme_with_port_stripped(self):
assert (
ImageProvider._extract_registry("http://localhost:5000/myimage:latest")
== "localhost:5000"
)
def test_https_scheme_with_path_stripped(self):
assert (
ImageProvider._extract_registry("https://ghcr.io/org/image:tag")
== "ghcr.io"
)
class TestIsRegistryUrl:
def test_bare_ecr_hostname(self):
@@ -728,6 +785,16 @@ class TestIsRegistryUrl:
def test_dockerhub_namespace(self):
assert not ImageProvider._is_registry_url("library/alpine")
def test_https_scheme_bare_hostname(self):
assert ImageProvider._is_registry_url("https://my-registry.example.com")
def test_http_scheme_bare_hostname_with_port(self):
assert ImageProvider._is_registry_url("http://my-registry.example.com:5000")
def test_https_scheme_image_reference_not_registry(self):
"""A scheme-prefixed full image reference is still an image, not a registry URL."""
assert not ImageProvider._is_registry_url("https://ghcr.io/myorg/repo:tag")
class TestTestRegistryConnection:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
@@ -6,90 +6,92 @@ from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permissions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
Rule(resources=["pods", "services"], verbs=["get", "list"]),
# Rule 2: Allows 'create' and 'delete' on 'deployments'
Rule(resources=["deployments"], verbs=["create", "delete"]),
]
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permissions(rules, resources, verbs)
assert is_rule_allowing_permissions(
rules, ["pods", "deployments"], ["get", "create"]
)
def test_no_permissions(self):
# Test when there are no rules
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
rules = [
Rule(resources=["services"], verbs=["get", "list"]),
Rule(resources=["pods"], verbs=["create", "delete"]),
]
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions(
rules, ["deployments", "configmaps"], ["get", "create"]
)
def test_empty_rules(self):
# Test when the rules list is empty
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], [])
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], ["get"])
assert not is_rule_allowing_permissions(rules, ["pods"], [])
def test_rule_with_non_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["apps"])]
assert not is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_default_api_group_is_core(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_empty_api_groups_does_not_match_non_core_request(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert not is_rule_allowing_permissions(
rules, ["pods"], ["get"], ["admissionregistration.k8s.io"]
)
def test_non_core_rule_does_not_match_without_api_groups_argument(self):
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = []
verbs = ["get"]
assert not is_rule_allowing_permissions(
rules, ["validatingwebhookconfigurations"], ["create"]
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_ignored_api_groups(self):
# Test when a rule has apiGroups that are not relevant
def test_explicit_non_core_api_group(self):
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = ["pods"]
verbs = ["get"]
assert is_rule_allowing_permissions(
rules,
["validatingwebhookconfigurations"],
["create"],
["admissionregistration.k8s.io"],
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["*"])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, ["pods"], ["get"], ["apps"])
def test_rule_with_relevant_api_groups(self):
# Test when a rule has apiGroups that are relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
def test_rule_with_wildcard_resources(self):
rules = [Rule(resources=["*"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_verbs(self):
rules = [Rule(resources=["pods"], verbs=["*"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])

Some files were not shown because too many files have changed in this diff Show More