Compare commits

...

46 Commits

Author SHA1 Message Date
Alan Buscaglia f0c5621476 test(ui): cover attack path node visual mappings 2026-05-06 16:25:58 +02:00
Alan Buscaglia c220603438 chore(*): fix provider node naming 2026-05-06 15:57:07 +02:00
Alan Buscaglia 331088b8e6 fix(ui): simplify attack path graph clicks
- remove graph node action and details flows
- toggle findings directly from resource clicks
- keep finding details as the only graph detail view
2026-05-06 11:56:42 +02:00
Alan Buscaglia 7559f04287 fix(ui): harden attack path graph interactions
- preserve graph edges and hidden-finding visibility rules
- align workflow scripts and graph legend behavior
- strengthen browser and unit regressions for graph flows
2026-05-06 11:16:14 +02:00
Alan Buscaglia b84c66abfd fix(ui): use horizontal attack path graph layout 2026-05-06 08:17:32 +02:00
Alan Buscaglia 435010fe9a fix(ui): avoid reserving attack path finding space 2026-05-06 01:05:49 +02:00
Alan Buscaglia 6be12a2eb6 fix(ui): improve attack paths graph layout highlights 2026-05-05 23:08:24 +02:00
Alan Buscaglia fc7e0c85e4 feat(ui): expand attack paths node legend coverage 2026-05-05 21:54:39 +02:00
Alan Buscaglia 33ad74b53c feat(ui): improve attack paths graph exploration 2026-05-05 21:28:26 +02:00
Alan Buscaglia c5aa4ced0d feat(ui): render attack paths node icons
- Add badge-style graph nodes with wrapped labels
- Map provider, resource, identity, and finding icons
- Align graph layout and handles with badge nodes
- Cover icon rendering and visual metadata with tests
2026-05-05 20:47:13 +02:00
Alan Buscaglia 58b0fa556d feat(ui): add attack paths node visual mapper
- Add typed resolver for graph node visual metadata
- Reuse existing service icons for known resource labels
- Cover exact, alias, and fallback mappings with tests
2026-05-05 20:00:22 +02:00
Alan Buscaglia aa311623fe refactor(ui): simplify attack paths graph interactions
- Reuse shared measured-fit scheduling for graph viewport updates
- Consolidate node action dialog state
- Tighten browser harness dialog detection
2026-05-05 19:25:00 +02:00
Alan Buscaglia 142b45a387 fix(ui): improve attack paths graph interactions
- Restore supported graph scroll zoom behavior
- Add node action selector for ambiguous resource clicks
- Open finding and node details in existing drawers
- Cover resource actions with browser tests
2026-05-05 19:13:04 +02:00
Pablo F.G ec102d1569 fix(ui): re-fit attack-path graph when expand reveals off-screen findings
Recover the expand-time auto-fit lost while smoothing the filter
fix. Hidden findings are not measured by React Flow on initial
render, so `fitViewOptions.includeHiddenNodes` cannot extend the
initial viewport to cover them — clicking a resource that has its
findings laid out beyond the framed area would leave the user with
empty space and no way to discover the newly revealed nodes.

The expand-fit only fires for resources that just transitioned from
collapsed to expanded, and only when at least one of the connected
findings sits entirely past the current viewport (full bounding box
beyond the edge). Partially clipped edge nodes are left alone so
the framing the user already has is preserved when nothing has
actually moved off-screen.

Auto-fits now use asymmetric padding (extra room on the right and
bottom) to keep the minimap clear after a fit. Without it, fitted
nodes could land underneath the bottom-right minimap and become
unclickable.

The expand-without-re-fit test was a lock-in for the previous
behaviour and is replaced with one asserting the new behaviour:
expanding resources whose findings sit off-screen re-fits the
viewport.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 17:28:08 +02:00
Pablo F.G d26f455784 fix(ui): re-fit attack-path graph on filter toggle and harden minimap
The Attack Path graph now re-fits its viewport when the user enters
the filtered view (click on a finding) or returns to the full graph
("Back to Full View"), so the focused subgraph and the restored
full graph are always centered instead of leaving the viewport
pointing at the previous coordinates. Resource expansion no longer
re-fits — the initial fit already includes hidden findings, so newly
revealed nodes sit inside the framing the user already has.

The minimap viewport indicator (mask cut-out) is darkened and given
a thicker border to stand out against the dark theme, where it was
previously hard to see.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 16:05:57 +02:00
Pablo F.G 4d5a77a58a chore(openspec): stop tracking openspec as submodule
Detach the openspec submodule so the directory is managed as a
local clone instead. /openspec/ remains in .gitignore so the cloned
working tree is never tracked by this repo.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 14:35:50 +02:00
Pablo F.G c183d5e868 fix: format 2026-05-05 14:18:51 +02:00
Pablo F.G 74e5118646 Merge remote-tracking branch 'origin/PROWLER-1273/react-flow-migration' into PROWLER-1273/react-flow-migration 2026-05-05 13:48:03 +02:00
Pablo F.G 48882b553f Merge remote-tracking branch 'origin/master' into PROWLER-1273/react-flow-migration 2026-05-05 13:41:30 +02:00
Pablo Fernandez Guerra (PFE) 8acbddd125 [CHAIN] test(ui): add Vitest Browser test coverage for Attack Paths (#10970)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 13:38:42 +02:00
Prowler Bot 786059bfb2 chore(docs): Bump version to v5.25.2 (#10993)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:45:07 +02:00
Pablo F.G 3d4f5e66ab Merge remote-tracking branch 'origin/master' into PROWLER-1273/react-flow-migration
# Conflicts:
#	ui/CHANGELOG.md
#	ui/dependency-log.json
2026-05-05 09:21:51 +02:00
Pablo Fernandez Guerra (PFE) a4fc230cf4 [CHAIN] feat(ui): add graph export, minimap and fullscreen polish (#10800)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 09:16:29 +02:00
Pablo Fernandez Guerra (PFE) 1d54244f2b [CHAIN] feat(ui): add graph interactions and filtered view (#10756)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 09:00:09 +02:00
Pablo Fernandez Guerra (PFE) ff2bf5b01d [CHAIN] refactor(ui): replace D3 graph rendering with React Flow (#10705)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 08:55:47 +02:00
Pepe Fagoaga 703a33108c chore(changelog): prepare for v5.25.2 (#10991) 2026-05-05 08:47:28 +02:00
Pepe Fagoaga 7c6d658154 fix(k8s): match RBAC rules by apiGroup, not just core (#10969)
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 19:54:03 +02:00
Pepe Fagoaga 21d7d08b4b fix(timeline): Return a compact actor name from CloudTrail events (#10986) 2026-05-04 19:39:17 +02:00
Pepe Fagoaga f314725f4d fix(k8s): deduplicate RBAC findings by unique subject (#10242)
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 18:11:38 +02:00
Rubén De la Torre Vico 02f43a7ad6 docs: add Prowler Studio page and remove check-kreator pages (#10981) 2026-05-04 17:51:02 +02:00
Daniel Barranquero 0dd8981ee4 feat: add issue template for creating new checks (#10976) 2026-05-04 17:47:39 +02:00
Rubén De la Torre Vico 269e51259d docs: add troubleshooting guide for stuck scans after worker crash (#10938) 2026-05-04 17:24:09 +02:00
Hugo Pereira Brito f4afdf0541 chore(ui): decrement changelog entry version to 1.25.2 (#10974) 2026-05-04 14:59:27 +01:00
Hugo Pereira Brito 652cb69216 fix(ui): compliance card layout polish (#10939) 2026-05-04 12:59:06 +01:00
Daniel Barranquero 921f49a0de feat(aws): add bedrock_prompt_management_exists security check (#10878) 2026-05-04 12:38:15 +02:00
Hugo Pereira Brito 6cb770fcc8 fix(ui): clean up findings expanded resource row layout (#10949) 2026-05-04 11:17:54 +01:00
Daniel Barranquero 86449fb99d chore(vercel): add disclaimer for checks depending on billing plan (#10663) 2026-05-04 08:56:50 +02:00
Andoni Alonso 40dd0e640b fix(sdk): strip http(s):// scheme from image registry URLs (#10950) 2026-05-04 08:37:46 +02:00
Pablo Fernandez Guerra (PFE) ba84b23afb [CHAIN] refactor(ui): normalize graph edge types and remove dead code (#10701)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-22 16:29:29 +02:00
Pablo F.G a9427c8024 chore(openspec): bump submodule to include PR4 test coverage proposal
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 15:29:57 +02:00
Pablo F.G 10a62a6850 Merge remote-tracking branch 'origin/master' into PROWLER-1273/react-flow-migration 2026-04-17 14:27:53 +02:00
Pablo F.G b4601abb4e chore(openspec): consolidate submodule to include all chain task and spec updates
Bumps the openspec submodule to incorporate the linearized task completion
status and spec updates from PR0 (1373), PR1 (1374), and PR2 (1375).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 14:27:14 +02:00
Pablo F.G 5c981f5683 chore: update openspec with restructured expect-cli tests
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:49:47 +02:00
Pablo F.G 9922b15391 chore: update openspec with expect-cli validation tasks
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:43:48 +02:00
Pablo F.G 6e77abea01 chore: update openspec submodule with react-flow-migration proposal
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:31:24 +02:00
Pablo F.G 4d57f3bef1 chore: add prowler-openspec-opensource as git submodule
Adds the openspec repository as a submodule at openspec/ for shared
spec definitions used by SDD tooling across AI coding assistants.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 11:56:09 +02:00
147 changed files with 8987 additions and 2785 deletions
@@ -0,0 +1,143 @@
name: "🔎 New Check Request"
description: Request a new Prowler security check
title: "[New Check]: "
labels: ["feature-request", "status/needs-triage"]
body:
- type: checkboxes
id: search
attributes:
label: Existing check search
description: Confirm this check does not already exist before opening a new request.
options:
- label: I have searched existing issues, Prowler Hub, and the public roadmap, and this check does not already exist.
required: true
- type: markdown
attributes:
value: |
Use this form to describe the security condition that Prowler should evaluate.
The most useful inputs for [Prowler Studio](https://github.com/prowler-cloud/prowler-studio) are:
- What should be detected
- What PASS and FAIL mean
- Vendor docs, API references, SDK methods, CLI commands, or reference code
- type: dropdown
id: provider
attributes:
label: Provider
description: Cloud or platform this check targets.
options:
- AWS
- Azure
- GCP
- Kubernetes
- GitHub
- Microsoft 365
- OCI
- Alibaba Cloud
- Cloudflare
- MongoDB Atlas
- Google Workspace
- OpenStack
- Vercel
- NHN
- Other / New provider
validations:
required: true
- type: input
id: other_provider_name
attributes:
label: New provider name
description: Only fill this if you selected "Other / New provider" above.
placeholder: "NewProviderName"
validations:
required: false
- type: input
id: service_name
attributes:
label: Service or product area
description: Optional. Main service, product, or feature to audit.
placeholder: "s3, bedrock, entra, repository, apiserver"
validations:
required: false
- type: input
id: suggested_check_name
attributes:
label: Suggested check name
description: Optional. Use `snake_case` following `<service>_<resource>_<best_practice>`, with lowercase letters and underscores only.
placeholder: "bedrock_guardrail_sensitive_information_filter_enabled"
validations:
required: false
- type: textarea
id: context
attributes:
label: Context and goal
description: Describe the security problem, why it matters, and what this new check should help detect.
placeholder: |-
- Security condition to validate:
- Why it matters:
- Resource, feature, or configuration involved:
validations:
required: true
- type: textarea
id: expected_behavior
attributes:
label: Expected behavior
description: Explain what the check should evaluate and what PASS, FAIL, or MANUAL should mean.
placeholder: |-
- Resource or scope to evaluate:
- PASS when:
- FAIL when:
- MANUAL when (if applicable):
- Exclusions, thresholds, or edge cases:
validations:
required: true
- type: textarea
id: references
attributes:
label: References
description: Add vendor docs, API references, SDK methods, CLI commands, endpoint docs, sample payloads, or similar reference material.
placeholder: |-
- Product or service documentation:
- API or SDK reference:
- CLI command or endpoint documentation:
- Sample payload or response:
- Security advisory or benchmark:
validations:
required: true
- type: dropdown
id: severity
attributes:
label: Suggested severity
description: Your best estimate. Reviewers will confirm during triage.
options:
- Critical
- High
- Medium
- Low
- Informational
- Not sure
validations:
required: true
- type: textarea
id: implementation_notes
attributes:
label: Additional implementation notes
description: Optional. Add permissions, unsupported regions, config knobs, product limitations, or anything else that may affect implementation.
placeholder: |-
- Required permissions or scopes:
- Region, tenant, or subscription limitations:
- Configurable behavior or thresholds:
- Other constraints:
validations:
required: false
+29 -9
View File
@@ -1,14 +1,14 @@
name: 'UI: Tests'
name: "UI: Tests"
on:
push:
branches:
- 'master'
- 'v5.*'
- "master"
- "v5.*"
pull_request:
branches:
- 'master'
- 'v5.*'
- "master"
- "v5.*"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -16,7 +16,7 @@ concurrency:
env:
UI_WORKING_DIR: ./ui
NODE_VERSION: '24.13.0'
NODE_VERSION: "24.13.0"
permissions: {}
@@ -42,6 +42,8 @@ jobs:
fonts.gstatic.com:443
api.github.com:443
release-assets.githubusercontent.com:443
cdn.playwright.dev:443
objects.githubusercontent.com:443
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -133,7 +135,7 @@ jobs:
if: steps.check-changes.outputs.any_changed == 'true' && steps.critical-changes.outputs.any_changed == 'true'
run: |
echo "Critical paths changed - running ALL unit tests"
pnpm run test:run
pnpm run test:unit
- name: Run unit tests (related to changes only)
if: steps.check-changes.outputs.any_changed == 'true' && steps.critical-changes.outputs.any_changed != 'true' && steps.changed-source.outputs.all_changed_files != ''
@@ -142,7 +144,7 @@ jobs:
echo "${STEPS_CHANGED_SOURCE_OUTPUTS_ALL_CHANGED_FILES}"
# Convert space-separated to vitest related format (remove ui/ prefix for relative paths)
CHANGED_FILES=$(echo "${STEPS_CHANGED_SOURCE_OUTPUTS_ALL_CHANGED_FILES}" | tr ' ' '\n' | sed 's|^ui/||' | tr '\n' ' ')
pnpm exec vitest related $CHANGED_FILES --run
pnpm exec vitest related $CHANGED_FILES --run --project unit
env:
STEPS_CHANGED_SOURCE_OUTPUTS_ALL_CHANGED_FILES: ${{ steps.changed-source.outputs.all_changed_files }}
@@ -150,7 +152,25 @@ jobs:
if: steps.check-changes.outputs.any_changed == 'true' && steps.critical-changes.outputs.any_changed != 'true' && steps.changed-source.outputs.all_changed_files == ''
run: |
echo "Only test files changed - running ALL unit tests"
pnpm run test:run
pnpm run test:unit
- name: Cache Playwright browsers
if: steps.check-changes.outputs.any_changed == 'true'
id: playwright-cache
uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: ~/.cache/ms-playwright
key: ${{ runner.os }}-playwright-chromium-${{ hashFiles('ui/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-playwright-chromium-
- name: Install Playwright Chromium browser
if: steps.check-changes.outputs.any_changed == 'true' && steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install chromium
- name: Run browser tests
if: steps.check-changes.outputs.any_changed == 'true'
run: pnpm run test:browser
- name: Build application
if: steps.check-changes.outputs.any_changed == 'true'
+1 -1
View File
@@ -6,7 +6,7 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- New `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
- `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
---
@@ -215,3 +215,6 @@ Also is important to keep all code examples as short as possible, including the
| e5 | M365 and Azure Entra checks enabled by or dependent on an E5 license (e.g., advanced threat protection, audit, DLP, and eDiscovery) |
| privilege-escalation | Detects IAM policies or permissions that allow identities to elevate their privileges beyond their intended scope, potentially gaining administrator or higher-level access through specific action combinations |
| ec2-imdsv1 | Identifies EC2 instances using Instance Metadata Service version 1 (IMDSv1), which is vulnerable to SSRF attacks and should be replaced with IMDSv2 for enhanced security |
| vercel-hobby-plan | Vercel checks whose audited feature is available on the Hobby plan (and therefore also on Pro and Enterprise plans) |
| vercel-pro-plan | Vercel checks whose audited feature requires a Pro plan or higher, including features also available on Enterprise or via supported paid add-ons for Pro plans |
| vercel-enterprise-plan | Vercel checks whose audited feature requires the Enterprise plan |
+20 -6
View File
@@ -27,14 +27,28 @@ The most common high level steps to create a new check are:
### Naming Format for Checks
Checks must be named following the format: `service_subservice_resource_action`.
If you already know the check name when creating a request or implementing a check, use a descriptive identifier with lowercase letters and underscores only.
Recommended patterns:
- `<service>_<resource>_<best_practice>`
The name components are:
- `service` The main service being audited (e.g., ec2, entra, iam, etc.)
- `subservice` An individual component or subset of functionality within the service that is being audited. This may correspond to a shortened version of the class attribute accessed within the check. If there is no subservice, just omit.
- `resource` The specific resource type being evaluated (e.g., instance, policy, role, etc.)
- `action` The security aspect or configuration being checked (e.g., public, encrypted, enabled, etc.)
- `service` The main service or product area being audited (e.g., ec2, entra, iam, bedrock).
- `resource` The resource, feature, or configuration being evaluated. It can be a single word or a compound phrase joined with underscores (e.g., instance, policy, guardrail, sensitive_information_filter).
- `best_practice` The expected secure state or best practice being checked (e.g., enabled, encrypted, restricted, configured, not_publicly_accessible).
Additional guidance:
- Use underscores only. Do not use hyphens.
- Keep the name specific enough to describe the behavior of the check.
- The first segment should match the service or product area whenever possible.
Examples:
- `s3_bucket_versioning_enabled`
- `bedrock_guardrail_sensitive_information_filter_enabled`
### File Creation
@@ -387,7 +401,7 @@ Provides both code examples and best practice recommendations for addressing the
#### Categories
One or more functional groupings used for execution filtering (e.g., `internet-exposed`). You can define new categories just by adding to this field.
One or more functional groupings used for execution filtering (e.g., `internet-exposed`). Categories must match the predefined values enforced by `CheckMetadata`; adding a new category requires updating the validator and the metadata documentation.
For the complete list of available categories, see [Categories Guidelines](/developer-guide/check-metadata-guidelines#categories-guidelines).
+131
View File
@@ -0,0 +1,131 @@
---
title: 'Prowler Studio'
---
**Prowler Studio is an AI workflow that ensures Claude Code follows Prowler's skills, guardrails, and best practices when creating new security checks.** What lands in the resulting pull request is consistent, tested, and ready for human review — not half-correct boilerplate that needs to be rewritten.
<Info>
**Contributor Tool**: Prowler Studio is a workflow for advanced contributors adding new Prowler security checks. It is not part of Prowler Cloud, Prowler App, or Prowler CLI.
</Info>
<Warning>
**Preview Feature**: Prowler Studio is under active development and breaking changes are expected. Please report issues or share feedback on [GitHub](https://github.com/prowler-cloud/prowler-studio/issues) or in the [Slack community](https://goto.prowler.com/slack).
</Warning>
<Card title="Prowler Studio Repository" icon="github" href="https://github.com/prowler-cloud/prowler-studio" horizontal>
Clone the source code, install Prowler Studio, and explore the agent workflow in detail.
</Card>
## The Problem
Adding a new check to [Prowler](https://github.com/prowler-cloud/prowler) is more than writing detection logic. A correct check has to:
- Match Prowler's exact service and check folder structure and naming conventions
- Wire up metadata, severity, remediation, tests, and compliance mappings
- Mirror the patterns used by the hundreds of existing checks in the same provider
- Actually load when Prowler scans for available checks — silent structural mistakes are easy to make
Asking a general-purpose AI assistant to do this usually means guessing. It misses conventions, skips tests, or invents structure that looks right but does not load. The result is a half-correct PR that needs to be reviewed line by line or rewritten.
## The Solution
Prowler Studio enforces the workflow end-to-end. Describe the check once — a markdown ticket, a Jira issue, or a GitHub issue — and the workflow:
1. **Loads Prowler-specific skills into every agent.** Every step starts with the same context an experienced Prowler engineer would have in mind. See [AI Skills System](/developer-guide/ai-skills) for how skills are structured.
2. **Runs specialized agents in sequence.** Implementation → testing → compliance mapping → review → PR creation. Each agent has one job and a tight scope.
3. **Verifies as it goes.** The check must load in Prowler. Tests must pass. If something fails, the agent fixes it and re-runs (up to a bounded number of attempts) before moving on.
4. **Produces a complete pull request.** Branch, passing check, tests, compliance mappings, and a pull request waiting for human review.
The result is a consistent starting point, every time, on every supported provider.
## Quick Start
### Install
Prowler Studio requires [`uv`](https://docs.astral.sh/uv/getting-started/installation/) — see the official [installation guide](https://docs.astral.sh/uv/getting-started/installation/).
```bash
git clone https://github.com/prowler-cloud/prowler-studio
cd prowler-studio
uv sync
source .venv/bin/activate
```
### Describe the Check
A ticket is a structured markdown description of the check to create. It is the only input the workflow needs; every agent (implementation, testing, compliance mapping, review, PR creation) uses it as the source of truth, so the more concrete it is, the closer the first PR will land to the desired outcome.
The ticket can be supplied in three ways:
- **Local markdown file** → `--ticket path/to/ticket.md`
- **Jira issue** → `--jira-url https://...` (uses the issue body)
- **GitHub issue** → `--github-url https://...` (uses the issue body)
The content should follow the **New Check Request** template:
- The local copy at [`check_ticket_template.md`](https://github.com/prowler-cloud/prowler-studio/blob/main/check_ticket_template.md) covers `--ticket` and Jira tickets.
- A prefilled GitHub form is also available: [Create a New Check Request issue](https://github.com/prowler-cloud/prowler/issues/new?template=new-check-request.yml).
Sections marked *Optional* can be skipped; everything else helps the agents make the right decisions.
### Run the Workflow
From a local markdown ticket:
```bash
prowler-studio --ticket check_ticket.md
```
From a Jira ticket:
```bash
prowler-studio --jira-url https://mycompany.atlassian.net/browse/PROJ-123
```
From a GitHub issue:
```bash
prowler-studio --github-url https://github.com/owner/repo/issues/123
```
<Note>
Provide exactly one of `--ticket`, `--jira-url`, or `--github-url`.
</Note>
Keep changes local (no push, no pull request):
```bash
prowler-studio -b feat/my-check --ticket check_ticket.md --local
```
### What You Get
After a successful run the working environment contains:
- A new branch on a clean Prowler worktree containing the check, metadata, tests, and compliance mappings
- A pull request opened against Prowler (skipped with `--local`)
- A timestamped log file under `logs/` capturing every step the agents took
## CLI Options
| Option | Short | Description |
|--------|-------|-------------|
| `--branch` | `-b` | Branch name (default: `feat/<ticket>-<check_name>` or `feat/<check_name>`) |
| `--ticket` | `-t` | Path to a markdown check ticket file |
| `--jira-url` | `-j` | Jira ticket URL (e.g., `https://mycompany.atlassian.net/browse/PROJ-123`) |
| `--github-url` | `-g` | GitHub issue URL (e.g., `https://github.com/owner/repo/issues/123`) |
| `--working-dir` | `-w` | Working directory for the Prowler clone (default: `./working`) |
| `--no-worktree` | | Legacy mode — work directly on the main clone instead of using worktrees |
| `--cleanup-worktree` | | Remove the worktree after a successful pull request is created |
| `--local` | | Keep changes local — skip push and pull request creation |
## Configuration
Set these environment variables depending on the input source:
| Variable | When Needed | Purpose |
|----------|-------------|---------|
| `GITHUB_TOKEN` | `--github-url` (recommended) | Higher GitHub API rate limits and access to private issues |
| `JIRA_SITE_URL` | `--jira-url` | Jira site, e.g. `https://mycompany.atlassian.net` |
| `JIRA_EMAIL` | `--jira-url` | Email of the Jira account used to fetch the ticket |
| `JIRA_API_TOKEN` | `--jira-url` | API token for the Jira account |
+2 -1
View File
@@ -365,7 +365,8 @@
"developer-guide/security-compliance-framework",
"developer-guide/lighthouse-architecture",
"developer-guide/mcp-server",
"developer-guide/ai-skills"
"developer-guide/ai-skills",
"developer-guide/prowler-studio"
]
},
{
@@ -121,8 +121,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.25.1"
PROWLER_API_VERSION="5.25.1"
PROWLER_UI_VERSION="5.25.2"
PROWLER_API_VERSION="5.25.2"
```
<Note>
+34
View File
@@ -159,6 +159,40 @@ When these environment variables are set, the API will use them directly instead
A fix addressing this permission issue is being evaluated in [PR #9953](https://github.com/prowler-cloud/prowler/pull/9953).
</Note>
### Scan Stuck in Executing State After Worker Crash
When running Prowler App via Docker Compose, a scan may remain indefinitely in the `executing` state if the worker process crashes (for example, due to an Out of Memory condition) before it can update the scan status. Since it is not currently possible to cancel a scan in `executing` state through the UI, the workaround is to manually update the scan record in the database.
**Root Cause:**
The Celery worker process terminates unexpectedly (OOM, node failure, etc.) before transitioning the scan state to `completed` or `failed`. The scan record remains in `executing` with no active process to advance it.
**Solution:**
Connect to the database using the `prowler_admin` user. Due to Row-Level Security (RLS), the default database user cannot see scan records — you must use `prowler_admin`:
```bash
psql -U prowler_admin -d prowler_db
```
Identify the stuck scan by filtering for scans in `executing` state:
```sql
SELECT id, name, state, started_at FROM scans WHERE state = 'executing';
```
Update the scan state to `failed` using the scan ID:
```sql
UPDATE scans SET state = 'failed' WHERE id = '<scan-id>';
```
After this change, the scan will appear as failed in the UI and you can launch a new scan.
<Note>
A feature to cancel executing scans directly from the UI is being tracked in [GitHub Issue #6893](https://github.com/prowler-cloud/prowler/issues/6893).
</Note>
### SAML/OAuth ACS URL Incorrect When Running Behind a Proxy or Load Balancer
See [GitHub Issue #9724](https://github.com/prowler-cloud/prowler/issues/9724) for more details.
@@ -1,47 +0,0 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
@@ -160,3 +160,25 @@ Prowler for Vercel includes security checks across the following services:
| **Project** | Deployment protection, environment variable security, fork protection, and skew protection |
| **Security** | Web Application Firewall (WAF), rate limiting, IP blocking, and managed rulesets |
| **Team** | SSO enforcement, directory sync, member access, and invitation hygiene |
## Checks With Explicit Plan-Based Behavior
Prowler currently includes 26 Vercel checks. The 11 checks below have explicit billing-plan handling in the provider metadata or check logic. When the scanned scope reports a billing plan, Prowler adds plan-aware context to findings for these checks. If the API does not expose the required configuration, Prowler may return `MANUAL` and require verification in the Vercel dashboard.
| Check ID | Hobby | Pro | Enterprise | Notes |
|----------|-------|-----|------------|-------|
| `project_password_protection_enabled` | Not available | Available as a paid add-on | Available | Checks password protection for deployments |
| `project_production_deployment_protection_enabled` | Not available | Available with supported paid deployment protection options | Available | Checks protection for production deployments |
| `project_skew_protection_enabled` | Not available | Available | Available | Checks skew protection during rollouts |
| `security_custom_rules_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_ip_blocking_rules_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `team_saml_sso_enabled` | Not available | Available | Available | Checks team SAML SSO configuration |
| `team_saml_sso_enforced` | Not available | Available | Available | Checks SAML SSO enforcement for all team members |
| `team_directory_sync_enabled` | Not available | Not available | Available | Checks SCIM directory sync |
| `security_managed_rulesets_enabled` | Bot Protection and AI Bots managed rulesets | Bot Protection and AI Bots managed rulesets | All managed rulesets, including OWASP Core Ruleset | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_rate_limiting_configured` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
| `security_waf_enabled` | Not available | Available | Available | Returns `MANUAL` when the firewall configuration cannot be assessed from the API |
<Note>
The five firewall-related checks (`security_waf_enabled`, `security_custom_rules_configured`, `security_ip_blocking_rules_configured`, `security_rate_limiting_configured`, and `security_managed_rulesets_enabled`) return `MANUAL` when the firewall configuration endpoint is not accessible from the API. The other 15 current Vercel checks do not currently include plan-specific handling in provider logic, but every Vercel check includes exactly one billing-plan metadata category (`vercel-hobby-plan`, `vercel-pro-plan`, or `vercel-enterprise-plan`) alongside its functional security category.
</Note>
@@ -1,51 +0,0 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
+15 -2
View File
@@ -9,12 +9,13 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `bedrock_guardrails_configured` check for AWS provider [(#10844)](https://github.com/prowler-cloud/prowler/pull/10844)
- Universal compliance pipeline integrated into the CLI: `--list-compliance` and `--list-compliance-requirements` show universal frameworks, and CSV plus OCSF outputs are generated for any framework declaring a `TableConfig` [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
- ASD Essential Eight Maturity Model compliance framework for AWS (Maturity Level One, Nov 2023) [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
- Update Vercel checks to return personalized finding status extended depending on billing plan and classify them with billing-plan categories [(#10663)](https://github.com/prowler-cloud/prowler/pull/10663)
- `bedrock_prompt_management_exists` check for AWS provider [(#10878)](https://github.com/prowler-cloud/prowler/pull/10878)
### 🔄 Changed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Azure Network Watcher flow log checks now require workspace-backed Traffic Analytics for `network_flow_log_captured_sent` and align metadata with VNet-compatible flow log guidance [(#10645)](https://github.com/prowler-cloud/prowler/pull/10645)
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs [(#10937)](https://github.com/prowler-cloud/prowler/pull/10937)
- AWS CodeBuild service now batches `BatchGetProjects` and `BatchGetBuilds` calls per region (up to 100 items per call) to reduce API call volume and prevent throttling-induced false positives in `codebuild_project_not_publicly_accessible` [(#10639)](https://github.com/prowler-cloud/prowler/pull/10639)
- `display_compliance_table` dispatch switched from substring `in` checks to `startswith` to prevent false matches between similarly named frameworks (e.g. `cisa` vs `cis`) [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
@@ -22,6 +23,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
- AWS `boto` user agent extra is now applied to every client [(#10944)](https://github.com/prowler-cloud/prowler/pull/10944)
- Image provider connection check no longer fails with a misleading `host='https'` resolution error when the registry URL includes an `http://` or `https://` scheme prefix [(#10950)](https://github.com/prowler-cloud/prowler/pull/10950)
### 🔐 Security
@@ -29,6 +31,17 @@ All notable changes to the **Prowler SDK** are documented in this file.
---
## [5.25.2] (Prowler v5.25.2)
### 🐞 Fixed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
- Match K8s RBAC rules by `apiGroup` [(#10969)](https://github.com/prowler-cloud/prowler/pull/10969)
- Return a compact actor name from CloudTrail `userIdentity` events [(#10986)](https://github.com/prowler-cloud/prowler/pull/10986)
---
## [5.25.1] (Prowler v5.25.1)
### 🐞 Fixed
@@ -2897,6 +2897,7 @@
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"bedrock_prompt_management_exists",
"cloudformation_stack_outputs_find_secrets",
"cloudfront_distributions_custom_ssl_certificate",
"cloudfront_distributions_default_root_object",
@@ -2901,6 +2901,7 @@
"bedrock_guardrails_configured",
"bedrock_model_invocation_logging_enabled",
"bedrock_model_invocation_logs_encryption_enabled",
"bedrock_prompt_management_exists",
"cloudformation_stack_outputs_find_secrets",
"cloudfront_distributions_custom_ssl_certificate",
"cloudfront_distributions_default_root_object",
+16 -12
View File
@@ -62,6 +62,9 @@ VALID_CATEGORIES = frozenset(
"e5",
"privilege-escalation",
"ec2-imdsv1",
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
}
)
@@ -244,14 +247,15 @@ class CheckMetadata(BaseModel):
# store the compliance later if supplied
Compliance: Optional[list[Any]] = Field(default_factory=list)
# TODO: Remove noqa and fix cls vulture errors
@validator("Categories", each_item=True, pre=True, always=True)
def valid_category(cls, value, values):
def valid_category(cls, value, values): # noqa: F841
if not isinstance(value, str):
raise ValueError("Categories must be a list of strings")
value_lower = value.lower()
if not re.match("^[a-z0-9-]+$", value_lower):
raise ValueError(
f"Invalid category: {value}. Categories can only contain lowercase letters, numbers and hyphen '-'"
f"Invalid category: {value}. Categories can only contain lowercase letters, numbers, and hyphen '-'"
)
if (
value_lower not in VALID_CATEGORIES
@@ -279,7 +283,7 @@ class CheckMetadata(BaseModel):
return resource_type
@validator("ServiceName", pre=True, always=True)
def validate_service_name(cls, service_name, values):
def validate_service_name(cls, service_name, values): # noqa: F841
if not service_name:
raise ValueError("ServiceName must be a non-empty string")
@@ -296,7 +300,7 @@ class CheckMetadata(BaseModel):
return service_name
@validator("CheckID", pre=True, always=True)
def valid_check_id(cls, check_id, values):
def valid_check_id(cls, check_id, values): # noqa: F841
if not check_id:
raise ValueError("CheckID must be a non-empty string")
@@ -309,7 +313,7 @@ class CheckMetadata(BaseModel):
return check_id
@validator("CheckTitle", pre=True, always=True)
def validate_check_title(cls, check_title, values):
def validate_check_title(cls, check_title, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if len(check_title) > 150:
raise ValueError(
@@ -322,13 +326,13 @@ class CheckMetadata(BaseModel):
return check_title
@validator("RelatedUrl", pre=True, always=True)
def validate_related_url(cls, related_url, values):
def validate_related_url(cls, related_url, values): # noqa: F841
if related_url and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
raise ValueError("RelatedUrl must be empty. This field is deprecated.")
return related_url
@validator("Remediation")
def validate_recommendation_url(cls, remediation, values):
def validate_recommendation_url(cls, remediation, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
url = remediation.Recommendation.Url
if url and not url.startswith("https://hub.prowler.com/"):
@@ -338,7 +342,7 @@ class CheckMetadata(BaseModel):
return remediation
@validator("CheckType", pre=True, always=True)
def validate_check_type(cls, check_type, values):
def validate_check_type(cls, check_type, values): # noqa: F841
provider = values.get("Provider", "").lower()
# Non-AWS providers must have an empty CheckType list
@@ -367,7 +371,7 @@ class CheckMetadata(BaseModel):
return check_type
@validator("Description", pre=True, always=True)
def validate_description(cls, description, values):
def validate_description(cls, description, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if len(description) > 400:
raise ValueError(
@@ -376,7 +380,7 @@ class CheckMetadata(BaseModel):
return description
@validator("Risk", pre=True, always=True)
def validate_risk(cls, risk, values):
def validate_risk(cls, risk, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if len(risk) > 400:
raise ValueError(
@@ -385,7 +389,7 @@ class CheckMetadata(BaseModel):
return risk
@validator("ResourceGroup", pre=True, always=True)
def validate_resource_group(cls, resource_group):
def validate_resource_group(cls, resource_group): # noqa: F841
if resource_group and resource_group not in VALID_RESOURCE_GROUPS:
raise ValueError(
f"Invalid ResourceGroup: '{resource_group}'. Must be one of: {', '.join(sorted(VALID_RESOURCE_GROUPS))} or empty string."
@@ -393,7 +397,7 @@ class CheckMetadata(BaseModel):
return resource_group
@validator("AdditionalURLs", pre=True, always=True)
def validate_additional_urls(cls, additional_urls):
def validate_additional_urls(cls, additional_urls): # noqa: F841
if not isinstance(additional_urls, list):
raise ValueError("AdditionalURLs must be a list")
@@ -221,27 +221,12 @@ class CloudTrailTimeline(TimelineService):
@staticmethod
def _extract_actor(user_identity: Dict[str, Any]) -> str:
"""Extract a human-readable actor name from CloudTrail userIdentity."""
# Try ARN first - most reliable
"""Return a compact actor name from CloudTrail userIdentity.
For ARNs, returns the resource portion (everything after the last
`:`) — e.g. `user/alice`, `assumed-role/MyRole/session-name`,
`root`. The full ARN is preserved separately in `actor_uid`.
"""
if arn := user_identity.get("arn"):
if "/" in arn:
parts = arn.split("/")
# For assumed-role, return the role name (second-to-last part)
if "assumed-role" in arn and len(parts) >= 2:
return parts[-2]
return parts[-1]
return arn.split(":")[-1]
# Fall back to userName
if username := user_identity.get("userName"):
return username
# Fall back to principalId
if principal_id := user_identity.get("principalId"):
return principal_id
# For service-invoked actions
if invoking_service := user_identity.get("invokedBy"):
return invoking_service
return "Unknown"
return arn.rsplit(":", 1)[-1]
return user_identity.get("invokedBy") or "Unknown"
@@ -0,0 +1,39 @@
{
"Provider": "aws",
"CheckID": "bedrock_prompt_management_exists",
"CheckTitle": "Amazon Bedrock Prompt Management prompts exist in the region",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "bedrock",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Other",
"ResourceGroup": "ai_ml",
"Description": "**Bedrock Prompt Management** enables centralized creation, versioning, and governance of prompts used with foundation models.\n\nThis region-level check verifies whether at least one managed prompt exists in each scanned region, used as an adoption signal for Prompt Management. The presence of a prompt does not by itself guarantee that every application prompt is managed.",
"Risk": "Without **Prompt Management**, prompts are scattered across applications with no central oversight, versioning, or auditability over instructions sent to foundation models, weakening governance and compliance posture.\n\nManaged prompts are a governance enabler; **prompt injection** defenses are provided by Bedrock **guardrails**, covered by separate checks.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-management.html",
"https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-management-create.html"
],
"Remediation": {
"Code": {
"CLI": "aws bedrock-agent create-prompt --name example_prompt --default-variant default --variants '[{\"name\":\"default\",\"templateType\":\"TEXT\",\"templateConfiguration\":{\"text\":{\"text\":\"Your prompt template here.\"}}}]'",
"NativeIaC": "",
"Other": "1. Open the Amazon Bedrock console\n2. Navigate to Prompt Management\n3. Click Create prompt\n4. Provide a name and configure the prompt template (a prompt can contain at most one variant; additional variants are created via CreatePromptVersion)\n5. Save the prompt",
"Terraform": ""
},
"Recommendation": {
"Text": "Adopt **Bedrock Prompt Management** to centralize prompt definitions, enforce versioning, and maintain governance over model interactions.\n\nUse managed prompts with **guardrails** and apply **least privilege** access controls to restrict who can create or modify prompts.",
"Url": "https://hub.prowler.com/check/bedrock_prompt_management_exists"
}
},
"Categories": [
"gen-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Results are generated per scanned region. Regions where `ListPrompts` cannot be queried are omitted from the findings."
}
@@ -0,0 +1,54 @@
"""Check for region-level Bedrock Prompt Management adoption."""
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.bedrock.bedrock_agent_client import (
bedrock_agent_client,
)
class bedrock_prompt_management_exists(Check):
"""Check whether Amazon Bedrock Prompt Management prompts exist in the region.
A region is reported only when ListPrompts succeeded for it; regions where
the API call failed (e.g. AccessDenied, unsupported region) are skipped at
the service layer and produce no finding.
- PASS: At least one managed prompt exists in the region (one finding per prompt).
- FAIL: No managed prompts exist in the region (one finding per region).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the Bedrock Prompt Management exists check.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for region in sorted(bedrock_agent_client.prompt_scanned_regions):
regional_prompts = sorted(
(
prompt
for prompt in bedrock_agent_client.prompts.values()
if prompt.region == region
),
key=lambda prompt: prompt.name,
)
if regional_prompts:
for prompt in regional_prompts:
report = Check_Report_AWS(metadata=self.metadata(), resource=prompt)
report.status = "PASS"
report.status_extended = f"Bedrock Prompt Management prompt {prompt.name} exists in region {region}."
findings.append(report)
else:
report = Check_Report_AWS(metadata=self.metadata(), resource={})
report.region = region
report.resource_id = "prompt-management"
report.resource_arn = f"arn:{bedrock_agent_client.audited_partition}:bedrock:{region}:{bedrock_agent_client.audited_account}:prompt-management"
report.status = "FAIL"
report.status_extended = (
f"No Bedrock Prompt Management prompts exist in region {region}."
)
findings.append(report)
return findings
@@ -140,7 +140,10 @@ class BedrockAgent(AWSService):
# Call AWSService's __init__
super().__init__("bedrock-agent", provider)
self.agents = {}
self.prompts = {}
self.prompt_scanned_regions: set = set()
self.__threading_call__(self._list_agents)
self.__threading_call__(self._list_prompts)
self.__threading_call__(self._list_tags_for_resource, self.agents.values())
def _list_agents(self, regional_client):
@@ -167,7 +170,32 @@ class BedrockAgent(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_prompts(self, regional_client):
"""List all prompts in a region.
Prompt Management is evaluated as a region-level adoption signal, so
prompt collection is intentionally not filtered by audit_resources.
"""
logger.info("Bedrock Agent - Listing Prompts...")
try:
paginator = regional_client.get_paginator("list_prompts")
for page in paginator.paginate():
for prompt in page.get("promptSummaries", []):
prompt_arn = prompt.get("arn", "")
self.prompts[prompt_arn] = Prompt(
id=prompt.get("id", ""),
name=prompt.get("name", ""),
arn=prompt_arn,
region=regional_client.region,
)
self.prompt_scanned_regions.add(regional_client.region)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_tags_for_resource(self, resource):
"""List tags for a Bedrock Agent resource."""
logger.info("Bedrock Agent - Listing Tags for Resource...")
try:
agent_tags = (
@@ -190,3 +218,12 @@ class Agent(BaseModel):
guardrail_id: Optional[str] = None
region: str
tags: Optional[list] = []
class Prompt(BaseModel):
"""Model representing a Bedrock Prompt Management prompt."""
id: str
name: str
arn: str
region: str
+15 -5
View File
@@ -329,12 +329,21 @@ class ImageProvider(Provider):
"""Image provider doesn't need a session since it uses Trivy directly"""
return None
@staticmethod
def _strip_scheme(value: str) -> str:
"""Remove a leading http:// or https:// scheme from a registry input."""
for prefix in ("https://", "http://"):
if value.lower().startswith(prefix):
return value[len(prefix) :]
return value
@staticmethod
def _extract_registry(image: str) -> str | None:
"""Extract registry hostname from an image reference.
Returns None for Docker Hub images (no registry prefix).
"""
image = ImageProvider._strip_scheme(image)
parts = image.split("/")
if len(parts) >= 2 and ("." in parts[0] or ":" in parts[0]):
return parts[0]
@@ -348,6 +357,7 @@ class ImageProvider(Provider):
or "myregistry.com:5000" are registry URLs (dots in host, no slash).
Image references like "alpine:3.18" or "nginx" are not.
"""
image_uid = ImageProvider._strip_scheme(image_uid)
if "/" not in image_uid:
host_part = image_uid.split(":")[0]
if "." in host_part:
@@ -835,11 +845,9 @@ class ImageProvider(Provider):
image_ref = f"{repo}:{tag}"
else:
# OCI registries need the full host/repo:tag reference
registry_host = self.registry.rstrip("/")
for prefix in ("https://", "http://"):
if registry_host.startswith(prefix):
registry_host = registry_host[len(prefix) :]
break
registry_host = ImageProvider._strip_scheme(
self.registry.rstrip("/")
)
image_ref = f"{registry_host}/{repo}:{tag}"
discovered_images.append(image_ref)
@@ -977,6 +985,8 @@ class ImageProvider(Provider):
if not image:
return Connection(is_connected=False, error="Image name is required")
image = ImageProvider._strip_scheme(image)
# Registry URL (bare hostname) → test via OCI catalog
if ImageProvider._is_registry_url(image):
return ImageProvider._test_registry_connection(
@@ -1,36 +1,37 @@
def is_rule_allowing_permissions(rules, resources, verbs):
def is_rule_allowing_permissions(rules, resources, verbs, api_groups=("",)):
"""
Check Kubernetes role permissions.
Check whether any RBAC rule grants the specified verbs on the specified
resources within the specified API groups.
This function takes in Kubernetes role rules, resources, and verbs,
and checks if any of the rules grant permissions on the specified
resources with the specified verbs.
A rule matches when its `apiGroups` includes any of `api_groups` (or "*"),
its `resources` includes any of `resources` (or "*"), and its `verbs`
includes any of `verbs` (or "*").
Args:
rules (List[Rule]): The list of Kubernetes role rules.
resources (List[str]): The list of resources to check permissions for.
verbs (List[str]): The list of verbs to check permissions for.
rules (List[Rule]): RBAC rules from a Role or ClusterRole.
resources (List[str]): Resources (or sub-resources) to check.
verbs (List[str]): Verbs to check.
api_groups (Iterable[str]): API groups the resources live in. Defaults
to ("",), the core API group, which matches the most common case.
Pass an explicit value for resources outside the core group, e.g.
("admissionregistration.k8s.io",) for webhook configurations.
Returns:
bool: True if any of the rules grant permissions, False otherwise.
bool: True if any rule grants the permission, False otherwise.
"""
if rules:
# Iterate through each rule in the list of rules
for rule in rules:
# Ensure apiGroups are relevant ("" or "v1" for secrets)
if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups):
continue # Skip rules with unrelated apiGroups
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
if (
rule.resources
and (
any(resource in rule.resources for resource in resources)
or "*" in rule.resources
)
and rule.verbs
and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs)
):
# If the rule matches, return True
return True
# If no rule matches, return False
if not rules:
return False
for rule in rules:
rule_api_groups = rule.apiGroups or [""]
if not (
any(g in rule_api_groups for g in api_groups) or "*" in rule_api_groups
):
continue
if (
rule.resources
and (any(r in rule.resources for r in resources) or "*" in rule.resources)
and rule.verbs
and (any(v in rule.verbs for v in verbs) or "*" in rule.verbs)
):
return True
return False
@@ -6,29 +6,40 @@ from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["update", "patch"]
resources = ["certificatesigningrequests/approval"]
api_groups = ["certificates.k8s.io"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings
@@ -11,21 +11,32 @@ resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings
@@ -9,29 +9,40 @@ resources = [
"mutatingwebhookconfigurations",
]
verbs = ["create", "update", "delete"]
api_groups = ["admissionregistration.k8s.io"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings
+27
View File
@@ -0,0 +1,27 @@
from typing import Optional
def extract_billing_plan(data: Optional[dict]) -> Optional[str]:
"""Return the Vercel billing plan from a user or team payload.
Vercel's REST API consistently returns the plan identifier at
``data["billing"]["plan"]`` (e.g. ``"hobby"``, ``"pro"``, ``"enterprise"``)
on both ``GET /v2/user`` and ``GET /v2/teams`` responses, even though the
field is not part of the public OpenAPI schema.
"""
if not isinstance(data, dict):
return None
billing = data.get("billing")
if not isinstance(billing, dict):
return None
plan = billing.get("plan")
return plan.lower() if isinstance(plan, str) else None
def plan_reason_suffix(
billing_plan: Optional[str], unsupported_plans: set[str], explanation: str
) -> str:
"""Return a plan-based explanation suffix only when the plan proves it."""
if billing_plan in unsupported_plans:
return f" This may be expected because {explanation}"
return ""
@@ -84,10 +84,10 @@ class VercelService:
)
if response.status_code == 403:
# Plan limitation or permission error — return None for graceful handling
logger.warning(
# Endpoint unavailable for this token/scope; let checks handle it gracefully
logger.info(
f"{self.service} - Access denied for {path} (403). "
"This may be a plan limitation."
"This may be caused by plan or permission restrictions."
)
return None
+19
View File
@@ -21,6 +21,7 @@ class VercelTeamInfo(BaseModel):
id: str
name: str
slug: str
billing_plan: Optional[str] = None
class VercelIdentityInfo(BaseModel):
@@ -29,9 +30,27 @@ class VercelIdentityInfo(BaseModel):
user_id: Optional[str] = None
username: Optional[str] = None
email: Optional[str] = None
billing_plan: Optional[str] = None
team: Optional[VercelTeamInfo] = None
teams: list[VercelTeamInfo] = Field(default_factory=list)
def get_billing_plan_for(self, scope_id: Optional[str]) -> Optional[str]:
"""Return the billing plan for an explicit user or team scope."""
if not scope_id:
return None
if self.team and self.team.id == scope_id and self.team.billing_plan:
return self.team.billing_plan
for team in self.teams:
if team.id == scope_id:
return team.billing_plan
if self.user_id == scope_id:
return self.billing_plan
return None
class VercelOutputOptions(ProviderOutputOptions):
"""Customize output filenames for Vercel scans."""
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"encryption"
"encryption",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"secrets"
"secrets",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"secrets"
"secrets",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"secrets"
"secrets",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
@@ -28,7 +28,8 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"project_deployment_protection_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Enterprise, or as a paid add-on for Pro plans."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.project.project_client import project_client
@@ -38,6 +39,7 @@ class project_password_protection_enabled(Check):
report.status_extended = (
f"Project {project.name} does not have password protection "
f"configured for deployments."
f"{plan_reason_suffix(project.billing_plan, {'hobby'}, 'password protection is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"project_deployment_protection_enabled"
],
"Notes": ""
"Notes": "Protecting production deployments requires Enterprise, or Pro plans with supported paid deployment protection options."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.project.project_client import project_client
@@ -38,6 +39,7 @@ class project_production_deployment_protection_enabled(Check):
report.status_extended = (
f"Project {project.name} does not have deployment protection "
f"enabled on production deployments."
f"{plan_reason_suffix(project.billing_plan, {'hobby'}, 'protecting production deployments is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -20,6 +20,7 @@ class Project(VercelService):
"""List all projects, optionally filtered by --project argument."""
try:
raw_projects = self._paginate("/v9/projects", "projects")
identity = getattr(self.provider, "identity", None)
filter_projects = self.provider.filter_projects
seen_ids: set[str] = set()
@@ -57,10 +58,17 @@ class Project(VercelService):
pwd_protection = proj.get("passwordProtection")
security = proj.get("security", {}) or {}
project_team_id = proj.get("accountId") or self.provider.session.team_id
self.projects[project_id] = VercelProject(
id=project_id,
name=project_name,
team_id=proj.get("accountId") or self.provider.session.team_id,
team_id=project_team_id,
billing_plan=(
identity.get_billing_plan_for(project_team_id)
if identity
else None
),
framework=proj.get("framework"),
node_version=proj.get("nodeVersion"),
auto_expose_system_envs=proj.get("autoExposeSystemEnvs", False),
@@ -160,6 +168,7 @@ class VercelProject(BaseModel):
id: str
name: str
team_id: Optional[str] = None
billing_plan: Optional[str] = None
framework: Optional[str] = None
node_version: Optional[str] = None
auto_expose_system_envs: bool = False
@@ -28,9 +28,10 @@
}
},
"Categories": [
"resilience"
"resilience",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.project.project_client import project_client
@@ -34,6 +35,7 @@ class project_skew_protection_enabled(Check):
report.status_extended = (
f"Project {project.name} does not have skew protection enabled, "
f"which may cause version mismatches during deployments."
f"{plan_reason_suffix(project.billing_plan, {'hobby'}, 'skew protection is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -24,7 +25,16 @@ class security_custom_rules_configured(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.custom_rules:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for custom firewall rules because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'custom firewall rules are not available on the Vercel Hobby plan.')}"
)
elif config.custom_rules:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -25,7 +26,16 @@ class security_ip_blocking_rules_configured(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.ip_blocking_rules:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for IP blocking rules because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'IP blocking rules are not available on the Vercel Hobby plan.')}"
)
elif config.ip_blocking_rules:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
@@ -9,7 +9,7 @@
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "security",
"Description": "**Vercel projects** are assessed for **managed WAF ruleset** enablement. Managed rulesets are curated by Vercel and provide protection against known attack patterns including **OWASP Top 10** threats. This feature requires an Enterprise plan and reports MANUAL status when unavailable.",
"Description": "**Vercel projects** are assessed for **managed WAF ruleset** enablement. Managed rulesets are curated by Vercel and provide protection against known attack patterns including **OWASP Top 10** threats. Availability varies by ruleset, and the check reports MANUAL when the firewall configuration cannot be assessed from the API.",
"Risk": "Without **managed rulesets** enabled, the firewall lacks curated protection rules against well-known attack patterns. The application relies solely on custom rules, which may miss **new or evolving threats** that managed rulesets are designed to detect and block automatically.",
"RelatedUrl": "",
"AdditionalURLs": [
@@ -19,20 +19,21 @@
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Vercel dashboard\n2. Navigate to the project Settings > Security > Firewall\n3. Enable managed rulesets from the available options\n4. Review and configure ruleset sensitivity levels\n5. Note: This feature requires an Enterprise plan",
"Other": "1. Sign in to the Vercel dashboard\n2. Navigate to the project Settings > Security > Firewall\n3. Enable the managed rulesets that are available for your plan\n4. Review and configure ruleset sensitivity levels\n5. If the API does not expose firewall configuration for the project, verify the rulesets manually in the dashboard",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable managed WAF rulesets to benefit from Vercel-curated protection against common attack patterns. If you are on a plan that does not support managed rulesets, consider upgrading to the Enterprise plan for enhanced security features.",
"Text": "Enable the managed WAF rulesets that are available for your Vercel plan to benefit from curated protection against common attack patterns. If the API does not expose firewall configuration for the project, verify the rulesets manually in the dashboard.",
"Url": "https://hub.prowler.com/check/security_managed_rulesets_enabled"
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": "This check is plan-gated. If the Vercel API returns a 403 for managed rulesets, the check reports MANUAL status indicating that an Enterprise plan is required."
"Notes": "Managed ruleset availability varies by ruleset. OWASP Core Ruleset requires Enterprise, while Bot Protection and AI Bots managed rulesets are available on all plans."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -17,8 +18,8 @@ class security_managed_rulesets_enabled(Check):
"""Execute the Vercel Managed Rulesets Enabled check.
Iterates over all firewall configurations and checks if managed
rulesets are enabled. Reports MANUAL status when the feature is
not available due to plan limitations.
rulesets are enabled. Reports MANUAL status when the firewall
configuration cannot be assessed from the API.
Returns:
List[CheckReportVercel]: A list of reports for each project.
@@ -27,12 +28,14 @@ class security_managed_rulesets_enabled(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.managed_rulesets is None:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for managed rulesets. "
f"Enterprise plan required to access this feature."
f"could not be assessed for managed rulesets because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby', 'pro'}, 'some managed WAF rulesets, including the OWASP Core Ruleset, are only available on Vercel Enterprise plans.')}"
)
elif config.managed_rulesets:
report.status = "PASS"
@@ -28,11 +28,12 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_waf_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -24,7 +25,16 @@ class security_rate_limiting_configured(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.rate_limiting_rules:
if not config.firewall_config_accessible:
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be assessed for rate limiting rules because the "
f"firewall configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'rate limiting rules are not available on the Vercel Hobby plan.')}"
)
elif config.rate_limiting_rules:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
@@ -29,11 +29,13 @@ class Security(VercelService):
data = self._read_firewall_config(project)
if data is None:
# 403 — plan limitation, store with managed_rulesets=None
# Firewall config endpoint unavailable for this project/token
self.firewall_configs[project.id] = VercelFirewallConfig(
project_id=project.id,
project_name=project.name,
team_id=project.team_id,
billing_plan=project.billing_plan,
firewall_config_accessible=False,
firewall_enabled=False,
managed_rulesets=None,
name=project.name,
@@ -49,6 +51,8 @@ class Security(VercelService):
project_id=project.id,
project_name=project.name,
team_id=project.team_id,
billing_plan=project.billing_plan,
firewall_config_accessible=True,
firewall_enabled=(
fallback_firewall_enabled
if fallback_firewall_enabled is not None
@@ -93,6 +97,8 @@ class Security(VercelService):
project_id=project.id,
project_name=project.name,
team_id=project.team_id,
billing_plan=project.billing_plan,
firewall_config_accessible=True,
firewall_enabled=firewall_enabled,
managed_rulesets=managed,
custom_rules=custom_rules,
@@ -246,8 +252,10 @@ class VercelFirewallConfig(BaseModel):
project_id: str
project_name: Optional[str] = None
team_id: Optional[str] = None
billing_plan: Optional[str] = None
firewall_config_accessible: bool = True
firewall_enabled: bool = False
managed_rulesets: Optional[dict] = None # None means plan-gated (403)
managed_rulesets: Optional[dict] = None # None means config endpoint unavailable
custom_rules: list[dict] = Field(default_factory=list)
ip_blocking_rules: list[dict] = Field(default_factory=list)
rate_limiting_rules: list[dict] = Field(default_factory=list)
@@ -28,12 +28,13 @@
}
},
"Categories": [
"internet-exposed"
"internet-exposed",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"security_managed_rulesets_enabled",
"security_custom_rules_configured"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.security.security_client import security_client
@@ -24,13 +25,15 @@ class security_waf_enabled(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.managed_rulesets is None:
# 403 — plan limitation, cannot determine WAF status
if not config.firewall_config_accessible:
# Firewall config could not be retrieved for this project
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be checked for WAF status due to plan limitations. "
f"could not be checked for WAF status because the firewall "
f"configuration endpoint was not accessible. "
f"Manual verification is required."
f"{plan_reason_suffix(config.billing_plan, {'hobby'}, 'the Web Application Firewall is not available on the Vercel Hobby plan.')}"
)
elif config.firewall_enabled:
report.status = "PASS"
@@ -29,11 +29,12 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-enterprise-plan"
],
"DependsOn": [],
"RelatedTo": [
"team_saml_sso_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.team.team_client import team_client
@@ -40,6 +41,7 @@ class team_directory_sync_enabled(Check):
report.status_extended = (
f"Team {team.name} does not have directory sync (SCIM) enabled. "
f"User provisioning and deprovisioning must be managed manually."
f"{plan_reason_suffix(team.billing_plan, {'hobby', 'pro'}, 'directory sync (SCIM) is only available on Vercel Enterprise plans.')}"
)
findings.append(report)
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -28,7 +28,8 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-hobby-plan"
],
"DependsOn": [],
"RelatedTo": [],
@@ -29,11 +29,12 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"team_saml_sso_enforced"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.team.team_client import team_client
@@ -38,6 +39,7 @@ class team_saml_sso_enabled(Check):
report.status = "FAIL"
report.status_extended = (
f"Team {team.name} does not have SAML SSO enabled."
f"{plan_reason_suffix(team.billing_plan, {'hobby'}, 'SAML SSO is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -29,11 +29,12 @@
}
},
"Categories": [
"trust-boundaries"
"trust-boundaries",
"vercel-pro-plan"
],
"DependsOn": [],
"RelatedTo": [
"team_saml_sso_enabled"
],
"Notes": ""
"Notes": "Required billing plan: Pro or Enterprise."
}
@@ -1,6 +1,7 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.lib.billing import plan_reason_suffix
from prowler.providers.vercel.services.team.team_client import team_client
@@ -43,6 +44,7 @@ class team_saml_sso_enforced(Check):
else:
report.status_extended = (
f"Team {team.name} does not have SAML SSO enforced."
f"{plan_reason_suffix(team.billing_plan, {'hobby'}, 'SAML SSO is not available on the Vercel Hobby plan.')}"
)
findings.append(report)
@@ -4,6 +4,7 @@ from typing import Optional
from pydantic import BaseModel, Field
from prowler.lib.logger import logger
from prowler.providers.vercel.lib.billing import extract_billing_plan
from prowler.providers.vercel.lib.service.service import VercelService
@@ -67,6 +68,7 @@ class Team(VercelService):
id=team_data.get("id", team_id),
name=team_data.get("name", ""),
slug=team_data.get("slug", ""),
billing_plan=extract_billing_plan(team_data),
saml=saml_config,
directory_sync_enabled=dir_sync,
created_at=created_at,
@@ -151,6 +153,7 @@ class VercelTeam(BaseModel):
id: str
name: str
slug: str
billing_plan: Optional[str] = None
saml: Optional[SAMLConfig] = None
directory_sync_enabled: bool = False
members: list[VercelTeamMember] = Field(default_factory=list)
@@ -20,6 +20,7 @@ from prowler.providers.vercel.exceptions.exceptions import (
VercelRateLimitError,
VercelSessionError,
)
from prowler.providers.vercel.lib.billing import extract_billing_plan
from prowler.providers.vercel.lib.mutelist.mutelist import VercelMutelist
from prowler.providers.vercel.models import (
VercelIdentityInfo,
@@ -195,6 +196,7 @@ class VercelProvider(Provider):
user_id = user_data.get("id")
username = user_data.get("username")
email = user_data.get("email")
billing_plan = extract_billing_plan(user_data)
# Get team info
team_info = None
@@ -214,6 +216,7 @@ class VercelProvider(Provider):
id=team_data.get("id", session.team_id),
name=team_data.get("name", ""),
slug=team_data.get("slug", ""),
billing_plan=extract_billing_plan(team_data),
)
all_teams = [team_info]
elif team_response.status_code in (404, 403):
@@ -239,6 +242,7 @@ class VercelProvider(Provider):
id=t.get("id", ""),
name=t.get("name", ""),
slug=t.get("slug", ""),
billing_plan=extract_billing_plan(t),
)
)
if all_teams:
@@ -253,6 +257,7 @@ class VercelProvider(Provider):
user_id=user_id,
username=username,
email=email,
billing_plan=billing_plan,
team=team_info,
teams=all_teams,
)
+45 -1
View File
@@ -377,6 +377,50 @@ class TestCheckMetadataValidators:
check_metadata = CheckMetadata(**valid_metadata)
assert check_metadata.Categories == ["encryption", "logging", "secrets"]
def test_valid_vercel_plan_categories_success(self):
"""Test Vercel plan categories are accepted using hyphen-separated names."""
valid_metadata = {
"Provider": "vercel",
"CheckID": "test_check",
"CheckTitle": "Test Check",
"CheckType": [],
"ServiceName": "test",
"SubServiceName": "subtest",
"ResourceIdTemplate": "template",
"Severity": "high",
"ResourceType": "TestResource",
"Description": "Test description",
"Risk": "Test risk",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "test command",
"NativeIaC": "test native",
"Other": "test other",
"Terraform": "test terraform",
},
"Recommendation": {
"Text": "test recommendation",
"Url": "https://hub.prowler.com/check/test_check",
},
},
"Categories": [
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Test notes",
}
check_metadata = CheckMetadata(**valid_metadata)
assert check_metadata.Categories == [
"vercel-hobby-plan",
"vercel-pro-plan",
"vercel-enterprise-plan",
]
def test_valid_category_failure_non_string(self):
"""Test valid category validation fails with non-string category"""
invalid_metadata = {
@@ -454,7 +498,7 @@ class TestCheckMetadataValidators:
with pytest.raises(ValidationError) as exc_info:
CheckMetadata(**invalid_metadata)
assert (
"Categories can only contain lowercase letters, numbers and hyphen"
"Categories can only contain lowercase letters, numbers, and hyphen '-'"
in str(exc_info.value)
)
@@ -100,7 +100,7 @@ class TestCloudTrailTimeline:
assert len(result) == 1
assert result[0]["event_name"] == "RunInstances"
assert result[0]["actor"] == "admin"
assert result[0]["actor"] == "user/admin"
assert result[0]["source_ip_address"] == "203.0.113.1"
def test_get_resource_timeline_with_resource_uid(
@@ -304,14 +304,28 @@ class TestExtractActor:
"arn": "arn:aws:iam::123456789012:user/alice",
"userName": "alice",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "alice"
assert CloudTrailTimeline._extract_actor(user_identity) == "user/alice"
def test_extract_actor_assumed_role(self):
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/MyRole/session-name",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "MyRole"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/MyRole/session-name"
)
def test_extract_actor_assumed_role_sso(self):
"""SSO sessions store the user identity in the session name."""
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com",
}
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com"
)
def test_extract_actor_root(self):
user_identity = {"type": "Root", "arn": "arn:aws:iam::123456789012:root"}
@@ -327,21 +341,33 @@ class TestExtractActor:
== "elasticloadbalancing.amazonaws.com"
)
def test_extract_actor_fallback_to_principal_id(self):
user_identity = {"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
assert (
CloudTrailTimeline._extract_actor(user_identity) == "AROAEXAMPLEID:session"
)
def test_extract_actor_unknown(self):
assert CloudTrailTimeline._extract_actor({}) == "Unknown"
def test_extract_actor_username_only_returns_unknown(self):
"""When userIdentity carries only userName/principalId (no arn or
invokedBy), we deliberately return "Unknown" we rely on the ARN
from the upstream service for the actor."""
assert (
CloudTrailTimeline._extract_actor({"type": "IAMUser", "userName": "alice"})
== "Unknown"
)
assert (
CloudTrailTimeline._extract_actor(
{"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
)
== "Unknown"
)
def test_extract_actor_federated_user(self):
user_identity = {
"type": "FederatedUser",
"arn": "arn:aws:sts::123456789012:federated-user/developer",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "developer"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "federated-user/developer"
)
class TestParseEvent:
@@ -380,7 +406,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["event_source"] == "ec2.amazonaws.com"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
assert result["actor_uid"] == "arn:aws:iam::123456789012:user/admin"
assert result["actor_type"] == "IAMUser"
@@ -424,7 +450,10 @@ class TestParseEvent:
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": {
"userIdentity": {"type": "IAMUser", "userName": "admin"},
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
},
},
}
timeline = CloudTrailTimeline(session=mock_session)
@@ -432,7 +461,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
def test_parse_event_missing_event_id(self, mock_session):
"""Test parsing event without EventId returns None (event_id is required)."""
@@ -506,7 +535,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
# actor_type should be None when not present in userIdentity
assert result["actor_type"] is None
@@ -0,0 +1,280 @@
from unittest import mock
import botocore
from botocore.exceptions import ClientError
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
PROMPT_ARN = (
f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id"
)
def mock_make_api_call_list_prompts_access_denied(self, operation_name, kwarg):
"""Mock API call where ListPrompts fails with AccessDeniedException."""
if operation_name == "ListPrompts":
raise ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "User is not authorized to perform: bedrock:ListPrompts",
}
},
operation_name,
)
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_with_prompts(self, operation_name, kwarg):
"""Mock API call that returns prompts."""
if operation_name == "ListPrompts":
return {
"promptSummaries": [
{
"id": "test-prompt-id",
"name": "test-prompt",
"arn": PROMPT_ARN,
}
]
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_with_multiple_prompts(self, operation_name, kwarg):
"""Mock API call that returns multiple prompts."""
if operation_name == "ListPrompts":
return {
"promptSummaries": [
{
"id": "test-prompt-id-1",
"name": "test-prompt-1",
"arn": f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id-1",
},
{
"id": "test-prompt-id-2",
"name": "test-prompt-2",
"arn": f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id-2",
},
{
"id": "test-prompt-id-3",
"name": "test-prompt-3",
"arn": f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/test-prompt-id-3",
},
]
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_no_prompts(self, operation_name, kwarg):
"""Mock API call that returns no prompts."""
if operation_name == "ListPrompts":
return {"promptSummaries": []}
return make_api_call(self, operation_name, kwarg)
class Test_bedrock_prompt_management_exists:
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_prompts,
)
@mock_aws
def test_no_prompts(self):
"""Test FAIL when no prompts exist in the region."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No Bedrock Prompt Management prompts exist in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "prompt-management"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt-management"
)
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_with_prompts,
)
@mock_aws
def test_prompts_exist(self):
"""Test PASS when prompts exist in the region."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Bedrock Prompt Management prompt test-prompt exists in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "test-prompt-id"
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_arn == PROMPT_ARN
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_with_multiple_prompts,
)
@mock_aws
def test_multiple_prompts_exist(self):
"""Test PASS with one finding per prompt when multiple prompts exist."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 3
for index, finding in enumerate(result, start=1):
expected_name = f"test-prompt-{index}"
expected_id = f"test-prompt-id-{index}"
assert finding.status == "PASS"
assert (
finding.status_extended
== f"Bedrock Prompt Management prompt {expected_name} exists in region {AWS_REGION_US_EAST_1}."
)
assert finding.resource_id == expected_id
assert finding.region == AWS_REGION_US_EAST_1
assert (
finding.resource_arn
== f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:prompt/{expected_id}"
)
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_prompts,
)
@mock_aws
def test_no_prompts_multiple_regions(self):
"""Test FAIL in multiple regions when no prompts exist."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert len(result) == 2
for finding in result:
assert finding.status == "FAIL"
assert (
finding.status_extended
== f"No Bedrock Prompt Management prompts exist in region {finding.region}."
)
assert finding.resource_id == "prompt-management"
assert (
finding.resource_arn
== f"arn:aws:bedrock:{finding.region}:{AWS_ACCOUNT_NUMBER}:prompt-management"
)
regions = {finding.region for finding in result}
assert regions == {AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1}
@mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_list_prompts_access_denied,
)
@mock_aws
def test_list_prompts_client_error_skips_region(self):
"""Test that regions where ListPrompts fails produce no findings."""
from prowler.providers.aws.services.bedrock.bedrock_service import BedrockAgent
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists.bedrock_agent_client",
new=BedrockAgent(aws_provider),
),
):
from prowler.providers.aws.services.bedrock.bedrock_prompt_management_exists.bedrock_prompt_management_exists import (
bedrock_prompt_management_exists,
)
check = bedrock_prompt_management_exists()
result = check.execute()
assert result == []
@@ -341,3 +341,125 @@ class TestBedrockAgentPagination:
# Verify paginator was used
regional_client.get_paginator.assert_called_once_with("list_agents")
paginator.paginate.assert_called_once()
class TestBedrockPromptPagination:
"""Test suite for Bedrock Prompt pagination logic."""
def test_list_prompts_pagination(self):
"""Test that list_prompts iterates through all pages."""
# Mock the audit_info
audit_info = MagicMock()
audit_info.audited_partition = "aws"
audit_info.audited_account = "123456789012"
audit_info.audit_resources = None
# Mock the regional client
regional_client = MagicMock()
regional_client.region = "us-east-1"
# Mock paginator
paginator = MagicMock()
page1 = {
"promptSummaries": [
{
"id": "prompt-1",
"name": "prompt-name-1",
"arn": "arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-1",
}
]
}
page2 = {
"promptSummaries": [
{
"id": "prompt-2",
"name": "prompt-name-2",
"arn": "arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-2",
}
]
}
paginator.paginate.return_value = [page1, page2]
regional_client.get_paginator.return_value = paginator
# Initialize service and inject mock client
bedrock_agent_service = BedrockAgent(audit_info)
bedrock_agent_service.regional_clients = {"us-east-1": regional_client}
bedrock_agent_service.prompts = {} # Clear init side effects
bedrock_agent_service.prompt_scanned_regions = set()
# Run method
bedrock_agent_service._list_prompts(regional_client)
# Assertions
assert len(bedrock_agent_service.prompts) == 2
assert (
"arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-1"
in bedrock_agent_service.prompts
)
assert (
"arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-2"
in bedrock_agent_service.prompts
)
assert "us-east-1" in bedrock_agent_service.prompt_scanned_regions
# Verify paginator was used
regional_client.get_paginator.assert_called_once_with("list_prompts")
paginator.paginate.assert_called_once()
def test_list_prompts_ignores_audit_resources_filter(self):
"""Prompt collection is region-scoped and must ignore audit_resources."""
audit_info = MagicMock()
audit_info.audited_partition = "aws"
audit_info.audited_account = "123456789012"
audit_info.audit_resources = ["arn:aws:s3:::unrelated-resource"]
regional_client = MagicMock()
regional_client.region = "us-east-1"
paginator = MagicMock()
paginator.paginate.return_value = [
{
"promptSummaries": [
{
"id": "prompt-1",
"name": "prompt-name-1",
"arn": "arn:aws:bedrock:us-east-1:123456789012:prompt/prompt-1",
}
]
}
]
regional_client.get_paginator.return_value = paginator
bedrock_agent_service = BedrockAgent(audit_info)
bedrock_agent_service.regional_clients = {"us-east-1": regional_client}
bedrock_agent_service.prompts = {}
bedrock_agent_service.prompt_scanned_regions = set()
bedrock_agent_service._list_prompts(regional_client)
assert len(bedrock_agent_service.prompts) == 1
assert "us-east-1" in bedrock_agent_service.prompt_scanned_regions
def test_list_prompts_error_does_not_mark_region_scanned(self):
"""If ListPrompts raises, the region must not be added to prompt_scanned_regions."""
audit_info = MagicMock()
audit_info.audited_partition = "aws"
audit_info.audited_account = "123456789012"
audit_info.audit_resources = None
regional_client = MagicMock()
regional_client.region = "us-east-1"
paginator = MagicMock()
paginator.paginate.side_effect = Exception("ListPrompts failed")
regional_client.get_paginator.return_value = paginator
bedrock_agent_service = BedrockAgent(audit_info)
bedrock_agent_service.regional_clients = {"us-east-1": regional_client}
bedrock_agent_service.prompts = {}
bedrock_agent_service.prompt_scanned_regions = set()
bedrock_agent_service._list_prompts(regional_client)
assert bedrock_agent_service.prompts == {}
assert bedrock_agent_service.prompt_scanned_regions == set()
+68 -1
View File
@@ -7,6 +7,7 @@ from unittest.mock import MagicMock, patch
import pytest
from prowler.lib.check.models import CheckReportImage
from prowler.providers.common.provider import Provider
from prowler.providers.image.exceptions.exceptions import (
ImageInvalidConfigScannerError,
ImageInvalidNameError,
@@ -20,7 +21,6 @@ from prowler.providers.image.exceptions.exceptions import (
ImageScanError,
ImageTrivyBinaryNotFoundError,
)
from prowler.providers.common.provider import Provider
from prowler.providers.image.image_provider import ImageProvider
from tests.providers.image.image_fixtures import (
SAMPLE_IMAGE_SHA,
@@ -345,6 +345,24 @@ class TestImageProvider:
)
mock_adapter.list_repositories.assert_called_once()
@patch("prowler.providers.image.image_provider.create_registry_adapter")
def test_test_connection_registry_url_with_https_scheme(self, mock_factory):
"""Registry URL with https:// scheme is normalised before adapter creation."""
mock_adapter = MagicMock()
mock_adapter.list_repositories.return_value = ["repo1"]
mock_factory.return_value = mock_adapter
result = ImageProvider.test_connection(image="https://my-registry.example.com")
assert result.is_connected is True
mock_factory.assert_called_once_with(
registry_url="my-registry.example.com",
username=None,
password=None,
token=None,
)
mock_adapter.list_repositories.assert_called_once()
def test_build_status_extended(self):
"""Test status message content for different finding types."""
provider = _make_provider()
@@ -659,6 +677,27 @@ class TestImageProviderRegistryAuth:
assert "Docker login" in output
class TestStripScheme:
@pytest.mark.parametrize(
"raw,expected",
[
("https://my-registry.example.com", "my-registry.example.com"),
("http://my-registry.example.com", "my-registry.example.com"),
("HTTPS://My-Registry.Example.Com", "My-Registry.Example.Com"),
("Http://localhost:5000", "localhost:5000"),
("my-registry.example.com", "my-registry.example.com"),
("https://", ""),
("https://https://nested.example.com", "https://nested.example.com"),
(
"ftp://not-a-supported-scheme.example.com",
"ftp://not-a-supported-scheme.example.com",
),
],
)
def test_strip_scheme(self, raw, expected):
assert ImageProvider._strip_scheme(raw) == expected
class TestExtractRegistry:
def test_docker_hub_simple(self):
assert ImageProvider._extract_registry("alpine:3.18") is None
@@ -698,6 +737,24 @@ class TestExtractRegistry:
def test_bare_image_name(self):
assert ImageProvider._extract_registry("nginx") is None
def test_https_scheme_bare_hostname_returns_none(self):
"""Bare scheme-prefixed hostname has no image path, so no registry is extracted."""
assert (
ImageProvider._extract_registry("https://my-registry.example.com") is None
)
def test_http_scheme_with_port_stripped(self):
assert (
ImageProvider._extract_registry("http://localhost:5000/myimage:latest")
== "localhost:5000"
)
def test_https_scheme_with_path_stripped(self):
assert (
ImageProvider._extract_registry("https://ghcr.io/org/image:tag")
== "ghcr.io"
)
class TestIsRegistryUrl:
def test_bare_ecr_hostname(self):
@@ -728,6 +785,16 @@ class TestIsRegistryUrl:
def test_dockerhub_namespace(self):
assert not ImageProvider._is_registry_url("library/alpine")
def test_https_scheme_bare_hostname(self):
assert ImageProvider._is_registry_url("https://my-registry.example.com")
def test_http_scheme_bare_hostname_with_port(self):
assert ImageProvider._is_registry_url("http://my-registry.example.com:5000")
def test_https_scheme_image_reference_not_registry(self):
"""A scheme-prefixed full image reference is still an image, not a registry URL."""
assert not ImageProvider._is_registry_url("https://ghcr.io/myorg/repo:tag")
class TestTestRegistryConnection:
@patch("prowler.providers.image.image_provider.create_registry_adapter")
@@ -6,90 +6,92 @@ from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permissions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
Rule(resources=["pods", "services"], verbs=["get", "list"]),
# Rule 2: Allows 'create' and 'delete' on 'deployments'
Rule(resources=["deployments"], verbs=["create", "delete"]),
]
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permissions(rules, resources, verbs)
assert is_rule_allowing_permissions(
rules, ["pods", "deployments"], ["get", "create"]
)
def test_no_permissions(self):
# Test when there are no rules
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
rules = [
Rule(resources=["services"], verbs=["get", "list"]),
Rule(resources=["pods"], verbs=["create", "delete"]),
]
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions(
rules, ["deployments", "configmaps"], ["get", "create"]
)
def test_empty_rules(self):
# Test when the rules list is empty
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], [])
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], ["get"])
assert not is_rule_allowing_permissions(rules, ["pods"], [])
def test_rule_with_non_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["apps"])]
assert not is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_default_api_group_is_core(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_empty_api_groups_does_not_match_non_core_request(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert not is_rule_allowing_permissions(
rules, ["pods"], ["get"], ["admissionregistration.k8s.io"]
)
def test_non_core_rule_does_not_match_without_api_groups_argument(self):
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = []
verbs = ["get"]
assert not is_rule_allowing_permissions(
rules, ["validatingwebhookconfigurations"], ["create"]
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_ignored_api_groups(self):
# Test when a rule has apiGroups that are not relevant
def test_explicit_non_core_api_group(self):
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = ["pods"]
verbs = ["get"]
assert is_rule_allowing_permissions(
rules,
["validatingwebhookconfigurations"],
["create"],
["admissionregistration.k8s.io"],
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["*"])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, ["pods"], ["get"], ["apps"])
def test_rule_with_relevant_api_groups(self):
# Test when a rule has apiGroups that are relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
def test_rule_with_wildcard_resources(self):
rules = [Rule(resources=["*"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_verbs(self):
rules = [Rule(resources=["pods"], verbs=["*"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
@@ -0,0 +1,29 @@
from unittest import mock
from prowler.providers.vercel.lib.service.service import VercelService
class TestVercelService:
def test_get_returns_none_and_logs_info_on_expected_403(self):
service = VercelService.__new__(VercelService)
service.audit_config = {"max_retries": 0}
service.service = "security"
service._team_id = None
service._base_url = "https://api.vercel.com"
response = mock.MagicMock()
response.status_code = 403
service._http_session = mock.MagicMock()
service._http_session.get.return_value = response
with mock.patch(
"prowler.providers.vercel.lib.service.service.logger"
) as logger_mock:
result = service._get("/v1/security/firewall/config/active")
assert result is None
logger_mock.info.assert_called_once_with(
"security - Access denied for /v1/security/firewall/config/active (403). "
"This may be caused by plan or permission restrictions."
)
@@ -142,3 +142,41 @@ class Test_project_password_protection_enabled:
== f"Project {PROJECT_NAME} does not have password protection configured for deployments."
)
assert result[0].team_id == TEAM_ID
def test_no_password_protection_hobby_plan(self):
project_client = mock.MagicMock
project_client.projects = {
PROJECT_ID: VercelProject(
id=PROJECT_ID,
name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
password_protection=None,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(billing_plan="hobby"),
),
mock.patch(
"prowler.providers.vercel.services.project.project_password_protection_enabled.project_password_protection_enabled.project_client",
new=project_client,
),
):
from prowler.providers.vercel.services.project.project_password_protection_enabled.project_password_protection_enabled import (
project_password_protection_enabled,
)
check = project_password_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == PROJECT_ID
assert result[0].resource_name == PROJECT_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} does not have password protection configured for deployments. This may be expected because password protection is not available on the Vercel Hobby plan."
)
assert result[0].team_id == TEAM_ID
@@ -149,3 +149,41 @@ class Test_project_production_deployment_protection_enabled:
== f"Project {PROJECT_NAME} does not have deployment protection enabled on production deployments."
)
assert result[0].team_id == TEAM_ID
def test_protection_null_hobby_plan(self):
project_client = mock.MagicMock
project_client.projects = {
PROJECT_ID: VercelProject(
id=PROJECT_ID,
name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
production_deployment_protection=None,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(billing_plan="hobby"),
),
mock.patch(
"prowler.providers.vercel.services.project.project_production_deployment_protection_enabled.project_production_deployment_protection_enabled.project_client",
new=project_client,
),
):
from prowler.providers.vercel.services.project.project_production_deployment_protection_enabled.project_production_deployment_protection_enabled import (
project_production_deployment_protection_enabled,
)
check = project_production_deployment_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == PROJECT_ID
assert result[0].resource_name == PROJECT_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} does not have deployment protection enabled on production deployments. This may be expected because protecting production deployments is not available on the Vercel Hobby plan."
)
assert result[0].team_id == TEAM_ID
@@ -5,6 +5,7 @@ from tests.providers.vercel.vercel_fixtures import (
PROJECT_ID,
PROJECT_NAME,
TEAM_ID,
USER_ID,
set_mocked_vercel_provider,
)
@@ -43,3 +44,69 @@ class TestProjectService:
"ai_bots": {"active": False, "action": "deny"},
}
assert project.bot_id_enabled is True
def test_list_projects_uses_scoped_team_billing_plan(self):
service = Project.__new__(Project)
service.provider = set_mocked_vercel_provider(
billing_plan="enterprise",
team_billing_plan="hobby",
)
service.projects = {}
service._paginate = mock.MagicMock(
return_value=[
{
"id": PROJECT_ID,
"name": PROJECT_NAME,
"accountId": TEAM_ID,
}
]
)
service._list_projects()
project = service.projects[PROJECT_ID]
assert project.billing_plan == "hobby"
def test_list_projects_uses_user_billing_plan_for_user_scoped_project(self):
service = Project.__new__(Project)
service.provider = set_mocked_vercel_provider(
billing_plan="enterprise",
team_billing_plan="hobby",
)
service.projects = {}
service._paginate = mock.MagicMock(
return_value=[
{
"id": PROJECT_ID,
"name": PROJECT_NAME,
"accountId": USER_ID,
}
]
)
service._list_projects()
project = service.projects[PROJECT_ID]
assert project.billing_plan == "enterprise"
def test_list_projects_does_not_guess_billing_plan_without_scope(self):
service = Project.__new__(Project)
service.provider = set_mocked_vercel_provider(
billing_plan="enterprise",
team_billing_plan="hobby",
)
service.provider.session.team_id = None
service.projects = {}
service._paginate = mock.MagicMock(
return_value=[
{
"id": PROJECT_ID,
"name": PROJECT_NAME,
}
]
)
service._list_projects()
project = service.projects[PROJECT_ID]
assert project.billing_plan is None
@@ -105,3 +105,41 @@ class Test_project_skew_protection_enabled:
== f"Project {PROJECT_NAME} does not have skew protection enabled, which may cause version mismatches during deployments."
)
assert result[0].team_id == TEAM_ID
def test_skew_protection_disabled_hobby_plan(self):
project_client = mock.MagicMock
project_client.projects = {
PROJECT_ID: VercelProject(
id=PROJECT_ID,
name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
skew_protection=False,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(billing_plan="hobby"),
),
mock.patch(
"prowler.providers.vercel.services.project.project_skew_protection_enabled.project_skew_protection_enabled.project_client",
new=project_client,
),
):
from prowler.providers.vercel.services.project.project_skew_protection_enabled.project_skew_protection_enabled import (
project_skew_protection_enabled,
)
check = project_skew_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == PROJECT_ID
assert result[0].resource_name == PROJECT_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} does not have skew protection enabled, which may cause version mismatches during deployments. This may be expected because skew protection is not available on the Vercel Hobby plan."
)
assert result[0].team_id == TEAM_ID
@@ -111,3 +111,41 @@ class Test_security_custom_rules_configured:
== f"Project {PROJECT_NAME} ({PROJECT_ID}) does not have any custom firewall rules configured."
)
assert result[0].team_id == TEAM_ID
def test_custom_rules_status_unavailable_hobby_plan(self):
security_client = mock.MagicMock
security_client.firewall_configs = {
PROJECT_ID: VercelFirewallConfig(
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
firewall_config_accessible=False,
managed_rulesets=None,
id=PROJECT_ID,
name=PROJECT_NAME,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.security.security_custom_rules_configured.security_custom_rules_configured.security_client",
new=security_client,
),
):
from prowler.providers.vercel.services.security.security_custom_rules_configured.security_custom_rules_configured import (
security_custom_rules_configured,
)
check = security_custom_rules_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be assessed for custom firewall rules because the firewall configuration endpoint was not accessible. Manual verification is required. This may be expected because custom firewall rules are not available on the Vercel Hobby plan."
)
@@ -111,3 +111,41 @@ class Test_security_ip_blocking_rules_configured:
== f"Project {PROJECT_NAME} ({PROJECT_ID}) does not have any IP blocking rules configured."
)
assert result[0].team_id == TEAM_ID
def test_ip_rules_status_unavailable_hobby_plan(self):
security_client = mock.MagicMock
security_client.firewall_configs = {
PROJECT_ID: VercelFirewallConfig(
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
firewall_config_accessible=False,
managed_rulesets=None,
id=PROJECT_ID,
name=PROJECT_NAME,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.security.security_ip_blocking_rules_configured.security_ip_blocking_rules_configured.security_client",
new=security_client,
),
):
from prowler.providers.vercel.services.security.security_ip_blocking_rules_configured.security_ip_blocking_rules_configured import (
security_ip_blocking_rules_configured,
)
check = security_ip_blocking_rules_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be assessed for IP blocking rules because the firewall configuration endpoint was not accessible. Manual verification is required. This may be expected because IP blocking rules are not available on the Vercel Hobby plan."
)
@@ -121,6 +121,7 @@ class Test_security_managed_rulesets_enabled:
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
firewall_config_accessible=False,
firewall_enabled=False,
managed_rulesets=None,
id=PROJECT_ID,
@@ -150,6 +151,45 @@ class Test_security_managed_rulesets_enabled:
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be assessed for managed rulesets. Enterprise plan required to access this feature."
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be assessed for managed rulesets because the firewall configuration endpoint was not accessible. Manual verification is required."
)
assert result[0].team_id == TEAM_ID
def test_managed_rulesets_plan_gated_non_enterprise_scope(self):
security_client = mock.MagicMock
security_client.firewall_configs = {
PROJECT_ID: VercelFirewallConfig(
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="pro",
firewall_config_accessible=False,
firewall_enabled=False,
managed_rulesets=None,
id=PROJECT_ID,
name=PROJECT_NAME,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.security.security_managed_rulesets_enabled.security_managed_rulesets_enabled.security_client",
new=security_client,
),
):
from prowler.providers.vercel.services.security.security_managed_rulesets_enabled.security_managed_rulesets_enabled import (
security_managed_rulesets_enabled,
)
check = security_managed_rulesets_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be assessed for managed rulesets because the firewall configuration endpoint was not accessible. Manual verification is required. This may be expected because some managed WAF rulesets, including the OWASP Core Ruleset, are only available on Vercel Enterprise plans."
)
@@ -111,3 +111,41 @@ class Test_security_rate_limiting_configured:
== f"Project {PROJECT_NAME} ({PROJECT_ID}) does not have any rate limiting rules configured."
)
assert result[0].team_id == TEAM_ID
def test_rate_limiting_status_unavailable_hobby_plan(self):
security_client = mock.MagicMock
security_client.firewall_configs = {
PROJECT_ID: VercelFirewallConfig(
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
firewall_config_accessible=False,
managed_rulesets=None,
id=PROJECT_ID,
name=PROJECT_NAME,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.security.security_rate_limiting_configured.security_rate_limiting_configured.security_client",
new=security_client,
),
):
from prowler.providers.vercel.services.security.security_rate_limiting_configured.security_rate_limiting_configured import (
security_rate_limiting_configured,
)
check = security_rate_limiting_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be assessed for rate limiting rules because the firewall configuration endpoint was not accessible. Manual verification is required. This may be expected because rate limiting rules are not available on the Vercel Hobby plan."
)
@@ -7,7 +7,12 @@ from tests.providers.vercel.vercel_fixtures import PROJECT_ID, PROJECT_NAME, TEA
class TestSecurityService:
def test_fetch_firewall_config_reads_active_version_and_normalizes_response(self):
project = VercelProject(id=PROJECT_ID, name=PROJECT_NAME, team_id=TEAM_ID)
project = VercelProject(
id=PROJECT_ID,
name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="pro",
)
service = Security.__new__(Security)
service.firewall_configs = {}
@@ -89,6 +94,7 @@ class TestSecurityService:
)
config = service.firewall_configs[PROJECT_ID]
assert config.billing_plan == "pro"
assert config.firewall_enabled is True
assert config.managed_rulesets == {"owasp": {"active": True, "action": "deny"}}
assert [rule["id"] for rule in config.custom_rules] == ["rule-custom"]
@@ -113,3 +113,83 @@ class Test_security_waf_enabled:
== f"Project {PROJECT_NAME} ({PROJECT_ID}) does not have the Web Application Firewall enabled."
)
assert result[0].team_id == TEAM_ID
def test_waf_status_unavailable(self):
security_client = mock.MagicMock
security_client.firewall_configs = {
PROJECT_ID: VercelFirewallConfig(
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
firewall_config_accessible=False,
firewall_enabled=False,
managed_rulesets=None,
id=PROJECT_ID,
name=PROJECT_NAME,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.security.security_waf_enabled.security_waf_enabled.security_client",
new=security_client,
),
):
from prowler.providers.vercel.services.security.security_waf_enabled.security_waf_enabled import (
security_waf_enabled,
)
check = security_waf_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == PROJECT_ID
assert result[0].resource_name == PROJECT_NAME
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be checked for WAF status because the firewall configuration endpoint was not accessible. Manual verification is required."
)
assert result[0].team_id == TEAM_ID
def test_waf_status_unavailable_hobby_plan(self):
security_client = mock.MagicMock
security_client.firewall_configs = {
PROJECT_ID: VercelFirewallConfig(
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
team_id=TEAM_ID,
billing_plan="hobby",
firewall_config_accessible=False,
firewall_enabled=False,
managed_rulesets=None,
id=PROJECT_ID,
name=PROJECT_NAME,
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.security.security_waf_enabled.security_waf_enabled.security_client",
new=security_client,
),
):
from prowler.providers.vercel.services.security.security_waf_enabled.security_waf_enabled import (
security_waf_enabled,
)
check = security_waf_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "MANUAL"
assert (
result[0].status_extended
== f"Project {PROJECT_NAME} ({PROJECT_ID}) could not be checked for WAF status because the firewall configuration endpoint was not accessible. Manual verification is required. This may be expected because the Web Application Firewall is not available on the Vercel Hobby plan."
)
@@ -105,3 +105,41 @@ class Test_team_directory_sync_enabled:
== f"Team {TEAM_NAME} does not have directory sync (SCIM) enabled. User provisioning and deprovisioning must be managed manually."
)
assert result[0].team_id == ""
def test_directory_sync_disabled_pro_plan(self):
team_client = mock.MagicMock
team_client.teams = {
TEAM_ID: VercelTeam(
id=TEAM_ID,
name=TEAM_NAME,
slug=TEAM_SLUG,
directory_sync_enabled=False,
billing_plan="pro",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.team.team_directory_sync_enabled.team_directory_sync_enabled.team_client",
new=team_client,
),
):
from prowler.providers.vercel.services.team.team_directory_sync_enabled.team_directory_sync_enabled import (
team_directory_sync_enabled,
)
check = team_directory_sync_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == TEAM_ID
assert result[0].resource_name == TEAM_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Team {TEAM_NAME} does not have directory sync (SCIM) enabled. User provisioning and deprovisioning must be managed manually. This may be expected because directory sync (SCIM) is only available on Vercel Enterprise plans."
)
assert result[0].team_id == ""
@@ -106,3 +106,42 @@ class Test_team_saml_sso_enabled:
== f"Team {TEAM_NAME} does not have SAML SSO enabled."
)
assert result[0].team_id == ""
def test_saml_disabled_hobby_plan(self):
team_client = mock.MagicMock
team_client.teams = {
TEAM_ID: VercelTeam(
id=TEAM_ID,
name=TEAM_NAME,
slug=TEAM_SLUG,
saml=SAMLConfig(status="disabled", enforced=False),
billing_plan="hobby",
members=[],
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.team.team_saml_sso_enabled.team_saml_sso_enabled.team_client",
new=team_client,
),
):
from prowler.providers.vercel.services.team.team_saml_sso_enabled.team_saml_sso_enabled import (
team_saml_sso_enabled,
)
check = team_saml_sso_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == TEAM_ID
assert result[0].resource_name == TEAM_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Team {TEAM_NAME} does not have SAML SSO enabled. This may be expected because SAML SSO is not available on the Vercel Hobby plan."
)
assert result[0].team_id == ""
@@ -142,3 +142,41 @@ class Test_team_saml_sso_enforced:
== f"Team {TEAM_NAME} does not have SAML SSO enforced."
)
assert result[0].team_id == ""
def test_saml_disabled_hobby_plan(self):
team_client = mock.MagicMock
team_client.teams = {
TEAM_ID: VercelTeam(
id=TEAM_ID,
name=TEAM_NAME,
slug=TEAM_SLUG,
saml=SAMLConfig(status="disabled", enforced=False),
billing_plan="hobby",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_vercel_provider(),
),
mock.patch(
"prowler.providers.vercel.services.team.team_saml_sso_enforced.team_saml_sso_enforced.team_client",
new=team_client,
),
):
from prowler.providers.vercel.services.team.team_saml_sso_enforced.team_saml_sso_enforced import (
team_saml_sso_enforced,
)
check = team_saml_sso_enforced()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == TEAM_ID
assert result[0].resource_name == TEAM_NAME
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Team {TEAM_NAME} does not have SAML SSO enforced. This may be expected because SAML SSO is not available on the Vercel Hobby plan."
)
assert result[0].team_id == ""
+14 -5
View File
@@ -33,6 +33,8 @@ def set_mocked_vercel_provider(
team_id: str = TEAM_ID,
identity: VercelIdentityInfo = None,
audit_config: dict = None,
billing_plan: str = None,
team_billing_plan: str = None,
):
"""Create a mocked VercelProvider for testing."""
provider = MagicMock()
@@ -42,15 +44,22 @@ def set_mocked_vercel_provider(
team_id=team_id,
http_session=MagicMock(),
)
resolved_team_billing_plan = (
team_billing_plan if team_billing_plan is not None else billing_plan
)
team_info = VercelTeamInfo(
id=TEAM_ID,
name=TEAM_NAME,
slug=TEAM_SLUG,
billing_plan=resolved_team_billing_plan,
)
provider.identity = identity or VercelIdentityInfo(
user_id=USER_ID,
username=USERNAME,
email=USER_EMAIL,
team=VercelTeamInfo(
id=TEAM_ID,
name=TEAM_NAME,
slug=TEAM_SLUG,
),
billing_plan=billing_plan,
team=team_info,
teams=[team_info],
)
provider.audit_config = audit_config or {"max_retries": 3}
provider.fixer_config = {}
@@ -0,0 +1,97 @@
from prowler.lib.check.models import CheckMetadata
class TestVercelMetadata:
EXPECTED_CATEGORIES = {
"authentication_no_stale_tokens": [
"trust-boundaries",
"vercel-hobby-plan",
],
"authentication_token_not_expired": [
"trust-boundaries",
"vercel-hobby-plan",
],
"deployment_production_uses_stable_target": [
"trust-boundaries",
"vercel-hobby-plan",
],
"domain_dns_properly_configured": [
"trust-boundaries",
"vercel-hobby-plan",
],
"domain_ssl_certificate_valid": ["encryption", "vercel-hobby-plan"],
"domain_verified": ["trust-boundaries", "vercel-hobby-plan"],
"project_auto_expose_system_env_disabled": [
"trust-boundaries",
"vercel-hobby-plan",
],
"project_deployment_protection_enabled": [
"internet-exposed",
"vercel-hobby-plan",
],
"project_directory_listing_disabled": [
"internet-exposed",
"vercel-hobby-plan",
],
"project_environment_no_overly_broad_target": [
"secrets",
"vercel-hobby-plan",
],
"project_environment_no_secrets_in_plain_type": [
"secrets",
"vercel-hobby-plan",
],
"project_environment_production_vars_not_in_preview": [
"secrets",
"vercel-hobby-plan",
],
"project_git_fork_protection_enabled": [
"internet-exposed",
"vercel-hobby-plan",
],
"project_password_protection_enabled": [
"internet-exposed",
"vercel-pro-plan",
],
"project_production_deployment_protection_enabled": [
"internet-exposed",
"vercel-pro-plan",
],
"project_skew_protection_enabled": ["resilience", "vercel-pro-plan"],
"security_custom_rules_configured": [
"internet-exposed",
"vercel-pro-plan",
],
"security_ip_blocking_rules_configured": [
"internet-exposed",
"vercel-pro-plan",
],
"security_managed_rulesets_enabled": [
"internet-exposed",
"vercel-hobby-plan",
],
"security_rate_limiting_configured": [
"internet-exposed",
"vercel-pro-plan",
],
"security_waf_enabled": ["internet-exposed", "vercel-pro-plan"],
"team_directory_sync_enabled": [
"trust-boundaries",
"vercel-enterprise-plan",
],
"team_member_role_least_privilege": [
"trust-boundaries",
"vercel-hobby-plan",
],
"team_no_stale_invitations": ["trust-boundaries", "vercel-hobby-plan"],
"team_saml_sso_enabled": ["trust-boundaries", "vercel-pro-plan"],
"team_saml_sso_enforced": ["trust-boundaries", "vercel-pro-plan"],
}
def test_vercel_checks_use_legacy_and_plan_categories(self):
vercel_metadata = CheckMetadata.get_bulk(provider="vercel")
assert set(vercel_metadata) == set(self.EXPECTED_CATEGORIES)
for check_id, expected_categories in self.EXPECTED_CATEGORIES.items():
assert vercel_metadata[check_id].Categories == expected_categories
+4
View File
@@ -7,6 +7,7 @@
# testing
/coverage
__screenshots__/
# next.js
/.next/
@@ -28,6 +29,9 @@ yarn-error.log*
.env*.local
.env
# Claude Code local settings
.claude/
# vercel
.vercel
+21
View File
@@ -2,6 +2,27 @@
All notable changes to the **Prowler UI** are documented in this file.
## Unreleased
### 🚀 Added
- Browser test mode using Vitest with the Playwright provider, with initial coverage of the Attack Paths page and a new `pnpm test:browser` script wired into CI
### 🔄 Changed
- Attack Paths graph: extract shared primitives across `FindingNode`, `ResourceNode`, and `InternetNode` (hidden handles, label truncation, fill/border resolution) without forcing a generic node renderer [(#10705)](https://github.com/prowler-cloud/prowler/pull/10705)
---
## [1.25.2] (Prowler v5.25.2)
### 🔄 Changed
- Compliance cards: progress bar now spans the full card width, the passing-requirements caption sits beside the framework logo under the title, and the ISO 27001 logo asset is recentered within its tile [(#10939)](https://github.com/prowler-cloud/prowler/pull/10939)
- Findings expanded resource rows now drop the redundant cube icons, render Service and Region with the same compact label style as Last seen and Failing for, and reorder columns to Status, Resource, Provider, Severity, then field labels [(#10949)](https://github.com/prowler-cloud/prowler/pull/10949)
---
## [1.25.1] (Prowler v5.25.1)
### 🐞 Fixed
+231
View File
@@ -0,0 +1,231 @@
import { http, HttpResponse } from "msw";
import type { PageFixture } from "@/app/(prowler)/attack-paths/(workflow)/query-builder/attack-paths-page.fixtures";
import type {
AttackPathQueriesResponse,
AttackPathQuery,
AttackPathQueryResult,
AttackPathScan,
AttackPathScansResponse,
QueryResultAttributes,
} from "@/types/attack-paths";
const API = process.env.NEXT_PUBLIC_API_BASE_URL!;
type JsonApiErrorBody = {
errors: Array<{ detail: string; status: string }>;
};
const toScansApiResponse = (
scans: AttackPathScan[],
): AttackPathScansResponse => ({
data: scans,
links: {
first: `${API}/attack-paths-scans?page=1`,
last: `${API}/attack-paths-scans?page=1`,
next: null,
prev: null,
},
});
const toQueriesApiResponse = (
queries: AttackPathQuery[],
): AttackPathQueriesResponse => ({
data: queries,
});
const toQueryResultApiResponse = (
attrs: QueryResultAttributes,
queryId: string,
): AttackPathQueryResult => ({
data: {
type: "attack-paths-query-run-requests",
id: queryId,
attributes: attrs,
},
});
const toErrorBody = (detail: string, status: number): JsonApiErrorBody => ({
errors: [{ detail, status: String(status) }],
});
const toFindingApiResponse = (fx: PageFixture, findingId: string) => {
const findingNode = fx.queryResult?.nodes.find(
(node) => node.id === findingId,
);
const resourceNode = fx.queryResult?.nodes.find((node) =>
fx.queryResult?.relationships?.some(
(rel) =>
(rel.source === node.id && rel.target === findingId) ||
(rel.target === node.id && rel.source === findingId),
),
);
const scan = fx.scans[0];
const providerId = scan?.relationships?.provider?.data?.id ?? "provider-1";
const resourceId = resourceNode?.id ?? "resource-1";
return {
data: {
type: "findings",
id: findingId,
attributes: {
uid: String(findingNode?.properties.id ?? findingId),
delta: null,
status: String(findingNode?.properties.status ?? "FAIL"),
status_extended: "Status extended",
severity: String(findingNode?.properties.severity ?? "critical"),
check_id: "attack_path_check",
muted: false,
muted_reason: null,
check_metadata: {
risk: "High",
notes: "",
checkid: "attack_path_check",
provider: "aws",
severity: String(findingNode?.properties.severity ?? "critical"),
checktype: [],
dependson: [],
relatedto: [],
categories: ["security"],
checktitle: String(
findingNode?.properties.check_title ?? "Attack path finding",
),
compliance: null,
relatedurl: "",
description: "Attack path finding description",
remediation: {
code: { cli: "", other: "", nativeiac: "", terraform: "" },
recommendation: { url: "", text: "Fix the finding" },
},
additionalurls: [],
servicename: String(resourceNode?.properties.service ?? "s3"),
checkaliases: [],
resourcetype: String(resourceNode?.labels[0] ?? "Resource"),
subservicename: "",
resourceidtemplate: "",
},
raw_result: null,
inserted_at: "2026-04-21T10:00:00Z",
updated_at: "2026-04-21T10:05:00Z",
first_seen_at: null,
},
relationships: {
resources: { data: [{ type: "resources", id: resourceId }] },
scan: { data: { type: "scans", id: scan?.id ?? "scan-1" } },
},
},
included: [
{
type: "resources",
id: resourceId,
attributes: {
uid: String(resourceNode?.properties.arn ?? resourceId),
name: String(resourceNode?.properties.name ?? resourceId),
region: "us-east-1",
service: String(resourceNode?.properties.service ?? "s3"),
tags: {},
type: String(resourceNode?.labels[0] ?? "Resource"),
inserted_at: "2026-04-21T10:00:00Z",
updated_at: "2026-04-21T10:05:00Z",
details: null,
partition: null,
},
},
{
type: "scans",
id: scan?.id ?? "scan-1",
attributes: {
name: "Attack path scan",
trigger: "manual",
state: scan?.attributes.state ?? "completed",
unique_resource_count: 1,
progress: scan?.attributes.progress ?? 100,
duration: scan?.attributes.duration ?? 0,
started_at: scan?.attributes.started_at ?? "2026-04-21T10:00:00Z",
inserted_at: scan?.attributes.inserted_at ?? "2026-04-21T10:00:00Z",
completed_at: scan?.attributes.completed_at ?? "2026-04-21T10:05:00Z",
scheduled_at: null,
next_scan_at: "",
},
relationships: {
provider: { data: { type: "providers", id: providerId } },
},
},
{
type: "providers",
id: providerId,
attributes: {
provider: scan?.attributes.provider_type ?? "aws",
uid: scan?.attributes.provider_uid ?? "123456789",
alias: scan?.attributes.provider_alias ?? "Provider",
connection: {
connected: true,
last_checked_at: "2026-04-21T10:00:00Z",
},
inserted_at: "2026-04-21T10:00:00Z",
updated_at: "2026-04-21T10:05:00Z",
},
},
],
};
};
export const handlersForFixture = (fx: PageFixture) => [
http.get(`${API}/attack-paths-scans`, () =>
HttpResponse.json<AttackPathScansResponse>(toScansApiResponse(fx.scans)),
),
http.get<{ scanId: string }>(
`${API}/attack-paths-scans/:scanId/queries`,
() =>
HttpResponse.json<AttackPathQueriesResponse>(
toQueriesApiResponse(fx.queries),
),
),
http.post<{ scanId: string }>(
`${API}/attack-paths-scans/:scanId/queries/run`,
() => {
if (fx.queryError) {
return HttpResponse.json<JsonApiErrorBody>(
toErrorBody(fx.queryError.error, fx.queryError.status),
{ status: fx.queryError.status },
);
}
if (!fx.queryResult) {
return HttpResponse.json<JsonApiErrorBody>(
toErrorBody("No data found", 404),
{ status: 404 },
);
}
return HttpResponse.json<AttackPathQueryResult>(
toQueryResultApiResponse(fx.queryResult, fx.queryId),
);
},
),
http.post<{ scanId: string }>(
`${API}/attack-paths-scans/:scanId/queries/custom`,
() => {
if (fx.queryError) {
return HttpResponse.json<JsonApiErrorBody>(
toErrorBody(fx.queryError.error, fx.queryError.status),
{ status: fx.queryError.status },
);
}
if (!fx.queryResult) {
return HttpResponse.json<JsonApiErrorBody>(
toErrorBody("No data found", 404),
{ status: 404 },
);
}
return HttpResponse.json<AttackPathQueryResult>(
toQueryResultApiResponse(fx.queryResult, fx.queryId),
);
},
),
http.get<{ findingId: string }>(`${API}/findings/:findingId`, ({ params }) =>
HttpResponse.json(toFindingApiResponse(fx, params.findingId)),
),
];
+13
View File
@@ -0,0 +1,13 @@
import type { HttpHandler } from "msw";
/**
* Static handlers shared by every browser test registered as defaults on
* the worker. Use this list for endpoints whose response doesn't change
* across tests (e.g. `/users/me`, `/tenants/current`, health checks).
*
* Per-domain dynamic handlers that depend on fixture data live in their own
* files alongside this index (e.g. `./attack-paths.ts`) and are imported
* directly by the tests that need them, then wired via
* `worker.use(...handlersForFixture(fx))`.
*/
export const handlers: HttpHandler[] = [];
+5
View File
@@ -0,0 +1,5 @@
import { setupWorker } from "msw/browser";
import { handlers } from "./handlers";
export const worker = setupWorker(...handlers);
+25
View File
@@ -0,0 +1,25 @@
import type { ComponentType, PropsWithChildren, ReactElement } from "react";
import { render as vitestRender } from "vitest-browser-react";
const TestProviders = ({ children }: PropsWithChildren) => <>{children}</>;
type RenderOptions = Parameters<typeof vitestRender>[1];
export function render(ui: ReactElement, options?: RenderOptions) {
const userWrapper = options?.wrapper as
| ComponentType<PropsWithChildren>
| undefined;
const Wrapper = userWrapper
? ({ children }: PropsWithChildren) => {
const Inner = userWrapper;
return (
<TestProviders>
<Inner>{children}</Inner>
</TestProviders>
);
}
: TestProviders;
return vitestRender(ui, { ...options, wrapper: Wrapper });
}

Some files were not shown because too many files have changed in this diff Show More