Compare commits

...

8 Commits

Author SHA1 Message Date
Pablo F.G 982171a336 chore(precommit): scope zizmor hook to workflows, actions and dependabot
The zizmor pre-commit hook used `files: ^\.github/`, which matched any
file under `.github/` — including issue templates, the labeler config,
CODEOWNERS, etc. zizmor only audits GitHub Actions workflows, composite
actions and Dependabot configs, so on any commit that touched a
non-auditable `.github/` file it failed with `exit code 3`:

    failed to validate input as workflow: input does not match expected
    validation schema
    fatal: no audit was performed
    error: no inputs collected

Narrow the regex to `^\.github/(workflows|actions)/.+\.ya?ml$|^\.github/dependabot\.ya?ml$`
so zizmor only inspects what it can audit. Verified locally: the hook
skips on `.github/ISSUE_TEMPLATE/*.yml` and the manifest file itself,
and still runs (and passes) on `.github/workflows/api-bump-version.yml`.
2026-05-05 10:05:57 +02:00
Pepe Fagoaga 703a33108c chore(changelog): prepare for v5.25.2 (#10991) 2026-05-05 08:47:28 +02:00
Pepe Fagoaga 7c6d658154 fix(k8s): match RBAC rules by apiGroup, not just core (#10969)
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 19:54:03 +02:00
Pepe Fagoaga 21d7d08b4b fix(timeline): Return a compact actor name from CloudTrail events (#10986) 2026-05-04 19:39:17 +02:00
Pepe Fagoaga f314725f4d fix(k8s): deduplicate RBAC findings by unique subject (#10242)
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 18:11:38 +02:00
Rubén De la Torre Vico 02f43a7ad6 docs: add Prowler Studio page and remove check-kreator pages (#10981) 2026-05-04 17:51:02 +02:00
Daniel Barranquero 0dd8981ee4 feat: add issue template for creating new checks (#10976) 2026-05-04 17:47:39 +02:00
Rubén De la Torre Vico 269e51259d docs: add troubleshooting guide for stuck scans after worker crash (#10938) 2026-05-04 17:24:09 +02:00
20 changed files with 618 additions and 304 deletions
@@ -0,0 +1,143 @@
name: "🔎 New Check Request"
description: Request a new Prowler security check
title: "[New Check]: "
labels: ["feature-request", "status/needs-triage"]
body:
- type: checkboxes
id: search
attributes:
label: Existing check search
description: Confirm this check does not already exist before opening a new request.
options:
- label: I have searched existing issues, Prowler Hub, and the public roadmap, and this check does not already exist.
required: true
- type: markdown
attributes:
value: |
Use this form to describe the security condition that Prowler should evaluate.
The most useful inputs for [Prowler Studio](https://github.com/prowler-cloud/prowler-studio) are:
- What should be detected
- What PASS and FAIL mean
- Vendor docs, API references, SDK methods, CLI commands, or reference code
- type: dropdown
id: provider
attributes:
label: Provider
description: Cloud or platform this check targets.
options:
- AWS
- Azure
- GCP
- Kubernetes
- GitHub
- Microsoft 365
- OCI
- Alibaba Cloud
- Cloudflare
- MongoDB Atlas
- Google Workspace
- OpenStack
- Vercel
- NHN
- Other / New provider
validations:
required: true
- type: input
id: other_provider_name
attributes:
label: New provider name
description: Only fill this if you selected "Other / New provider" above.
placeholder: "NewProviderName"
validations:
required: false
- type: input
id: service_name
attributes:
label: Service or product area
description: Optional. Main service, product, or feature to audit.
placeholder: "s3, bedrock, entra, repository, apiserver"
validations:
required: false
- type: input
id: suggested_check_name
attributes:
label: Suggested check name
description: Optional. Use `snake_case` following `<service>_<resource>_<best_practice>`, with lowercase letters and underscores only.
placeholder: "bedrock_guardrail_sensitive_information_filter_enabled"
validations:
required: false
- type: textarea
id: context
attributes:
label: Context and goal
description: Describe the security problem, why it matters, and what this new check should help detect.
placeholder: |-
- Security condition to validate:
- Why it matters:
- Resource, feature, or configuration involved:
validations:
required: true
- type: textarea
id: expected_behavior
attributes:
label: Expected behavior
description: Explain what the check should evaluate and what PASS, FAIL, or MANUAL should mean.
placeholder: |-
- Resource or scope to evaluate:
- PASS when:
- FAIL when:
- MANUAL when (if applicable):
- Exclusions, thresholds, or edge cases:
validations:
required: true
- type: textarea
id: references
attributes:
label: References
description: Add vendor docs, API references, SDK methods, CLI commands, endpoint docs, sample payloads, or similar reference material.
placeholder: |-
- Product or service documentation:
- API or SDK reference:
- CLI command or endpoint documentation:
- Sample payload or response:
- Security advisory or benchmark:
validations:
required: true
- type: dropdown
id: severity
attributes:
label: Suggested severity
description: Your best estimate. Reviewers will confirm during triage.
options:
- Critical
- High
- Medium
- Low
- Informational
- Not sure
validations:
required: true
- type: textarea
id: implementation_notes
attributes:
label: Additional implementation notes
description: Optional. Add permissions, unsupported regions, config knobs, product limitations, or anything else that may affect implementation.
placeholder: |-
- Required permissions or scopes:
- Region, tenant, or subscription limitations:
- Configurable behavior or thresholds:
- Other constraints:
validations:
required: false
+6 -1
View File
@@ -44,7 +44,12 @@ repos:
rev: v1.24.1
hooks:
- id: zizmor
files: ^\.github/
# Scope to the inputs zizmor actually audits: workflows, composite
# actions, and dependabot config. The previous `^\.github/` regex
# also matched issue templates / labeler / CODEOWNERS, which made
# zizmor exit 3 ("no audit was performed") on commits that touch
# those files.
files: ^\.github/(workflows|actions)/.+\.ya?ml$|^\.github/dependabot\.ya?ml$
priority: 30
## BASH
+1 -1
View File
@@ -6,7 +6,7 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- New `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
- `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
---
+19 -5
View File
@@ -27,14 +27,28 @@ The most common high level steps to create a new check are:
### Naming Format for Checks
Checks must be named following the format: `service_subservice_resource_action`.
If you already know the check name when creating a request or implementing a check, use a descriptive identifier with lowercase letters and underscores only.
Recommended patterns:
- `<service>_<resource>_<best_practice>`
The name components are:
- `service` The main service being audited (e.g., ec2, entra, iam, etc.)
- `subservice` An individual component or subset of functionality within the service that is being audited. This may correspond to a shortened version of the class attribute accessed within the check. If there is no subservice, just omit.
- `resource` The specific resource type being evaluated (e.g., instance, policy, role, etc.)
- `action` The security aspect or configuration being checked (e.g., public, encrypted, enabled, etc.)
- `service` The main service or product area being audited (e.g., ec2, entra, iam, bedrock).
- `resource` The resource, feature, or configuration being evaluated. It can be a single word or a compound phrase joined with underscores (e.g., instance, policy, guardrail, sensitive_information_filter).
- `best_practice` The expected secure state or best practice being checked (e.g., enabled, encrypted, restricted, configured, not_publicly_accessible).
Additional guidance:
- Use underscores only. Do not use hyphens.
- Keep the name specific enough to describe the behavior of the check.
- The first segment should match the service or product area whenever possible.
Examples:
- `s3_bucket_versioning_enabled`
- `bedrock_guardrail_sensitive_information_filter_enabled`
### File Creation
+131
View File
@@ -0,0 +1,131 @@
---
title: 'Prowler Studio'
---
**Prowler Studio is an AI workflow that ensures Claude Code follows Prowler's skills, guardrails, and best practices when creating new security checks.** What lands in the resulting pull request is consistent, tested, and ready for human review — not half-correct boilerplate that needs to be rewritten.
<Info>
**Contributor Tool**: Prowler Studio is a workflow for advanced contributors adding new Prowler security checks. It is not part of Prowler Cloud, Prowler App, or Prowler CLI.
</Info>
<Warning>
**Preview Feature**: Prowler Studio is under active development and breaking changes are expected. Please report issues or share feedback on [GitHub](https://github.com/prowler-cloud/prowler-studio/issues) or in the [Slack community](https://goto.prowler.com/slack).
</Warning>
<Card title="Prowler Studio Repository" icon="github" href="https://github.com/prowler-cloud/prowler-studio" horizontal>
Clone the source code, install Prowler Studio, and explore the agent workflow in detail.
</Card>
## The Problem
Adding a new check to [Prowler](https://github.com/prowler-cloud/prowler) is more than writing detection logic. A correct check has to:
- Match Prowler's exact service and check folder structure and naming conventions
- Wire up metadata, severity, remediation, tests, and compliance mappings
- Mirror the patterns used by the hundreds of existing checks in the same provider
- Actually load when Prowler scans for available checks — silent structural mistakes are easy to make
Asking a general-purpose AI assistant to do this usually means guessing. It misses conventions, skips tests, or invents structure that looks right but does not load. The result is a half-correct PR that needs to be reviewed line by line or rewritten.
## The Solution
Prowler Studio enforces the workflow end-to-end. Describe the check once — a markdown ticket, a Jira issue, or a GitHub issue — and the workflow:
1. **Loads Prowler-specific skills into every agent.** Every step starts with the same context an experienced Prowler engineer would have in mind. See [AI Skills System](/developer-guide/ai-skills) for how skills are structured.
2. **Runs specialized agents in sequence.** Implementation → testing → compliance mapping → review → PR creation. Each agent has one job and a tight scope.
3. **Verifies as it goes.** The check must load in Prowler. Tests must pass. If something fails, the agent fixes it and re-runs (up to a bounded number of attempts) before moving on.
4. **Produces a complete pull request.** Branch, passing check, tests, compliance mappings, and a pull request waiting for human review.
The result is a consistent starting point, every time, on every supported provider.
## Quick Start
### Install
Prowler Studio requires [`uv`](https://docs.astral.sh/uv/getting-started/installation/) — see the official [installation guide](https://docs.astral.sh/uv/getting-started/installation/).
```bash
git clone https://github.com/prowler-cloud/prowler-studio
cd prowler-studio
uv sync
source .venv/bin/activate
```
### Describe the Check
A ticket is a structured markdown description of the check to create. It is the only input the workflow needs; every agent (implementation, testing, compliance mapping, review, PR creation) uses it as the source of truth, so the more concrete it is, the closer the first PR will land to the desired outcome.
The ticket can be supplied in three ways:
- **Local markdown file** → `--ticket path/to/ticket.md`
- **Jira issue** → `--jira-url https://...` (uses the issue body)
- **GitHub issue** → `--github-url https://...` (uses the issue body)
The content should follow the **New Check Request** template:
- The local copy at [`check_ticket_template.md`](https://github.com/prowler-cloud/prowler-studio/blob/main/check_ticket_template.md) covers `--ticket` and Jira tickets.
- A prefilled GitHub form is also available: [Create a New Check Request issue](https://github.com/prowler-cloud/prowler/issues/new?template=new-check-request.yml).
Sections marked *Optional* can be skipped; everything else helps the agents make the right decisions.
### Run the Workflow
From a local markdown ticket:
```bash
prowler-studio --ticket check_ticket.md
```
From a Jira ticket:
```bash
prowler-studio --jira-url https://mycompany.atlassian.net/browse/PROJ-123
```
From a GitHub issue:
```bash
prowler-studio --github-url https://github.com/owner/repo/issues/123
```
<Note>
Provide exactly one of `--ticket`, `--jira-url`, or `--github-url`.
</Note>
Keep changes local (no push, no pull request):
```bash
prowler-studio -b feat/my-check --ticket check_ticket.md --local
```
### What You Get
After a successful run the working environment contains:
- A new branch on a clean Prowler worktree containing the check, metadata, tests, and compliance mappings
- A pull request opened against Prowler (skipped with `--local`)
- A timestamped log file under `logs/` capturing every step the agents took
## CLI Options
| Option | Short | Description |
|--------|-------|-------------|
| `--branch` | `-b` | Branch name (default: `feat/<ticket>-<check_name>` or `feat/<check_name>`) |
| `--ticket` | `-t` | Path to a markdown check ticket file |
| `--jira-url` | `-j` | Jira ticket URL (e.g., `https://mycompany.atlassian.net/browse/PROJ-123`) |
| `--github-url` | `-g` | GitHub issue URL (e.g., `https://github.com/owner/repo/issues/123`) |
| `--working-dir` | `-w` | Working directory for the Prowler clone (default: `./working`) |
| `--no-worktree` | | Legacy mode — work directly on the main clone instead of using worktrees |
| `--cleanup-worktree` | | Remove the worktree after a successful pull request is created |
| `--local` | | Keep changes local — skip push and pull request creation |
## Configuration
Set these environment variables depending on the input source:
| Variable | When Needed | Purpose |
|----------|-------------|---------|
| `GITHUB_TOKEN` | `--github-url` (recommended) | Higher GitHub API rate limits and access to private issues |
| `JIRA_SITE_URL` | `--jira-url` | Jira site, e.g. `https://mycompany.atlassian.net` |
| `JIRA_EMAIL` | `--jira-url` | Email of the Jira account used to fetch the ticket |
| `JIRA_API_TOKEN` | `--jira-url` | API token for the Jira account |
+2 -1
View File
@@ -365,7 +365,8 @@
"developer-guide/security-compliance-framework",
"developer-guide/lighthouse-architecture",
"developer-guide/mcp-server",
"developer-guide/ai-skills"
"developer-guide/ai-skills",
"developer-guide/prowler-studio"
]
},
{
+34
View File
@@ -159,6 +159,40 @@ When these environment variables are set, the API will use them directly instead
A fix addressing this permission issue is being evaluated in [PR #9953](https://github.com/prowler-cloud/prowler/pull/9953).
</Note>
### Scan Stuck in Executing State After Worker Crash
When running Prowler App via Docker Compose, a scan may remain indefinitely in the `executing` state if the worker process crashes (for example, due to an Out of Memory condition) before it can update the scan status. Since it is not currently possible to cancel a scan in `executing` state through the UI, the workaround is to manually update the scan record in the database.
**Root Cause:**
The Celery worker process terminates unexpectedly (OOM, node failure, etc.) before transitioning the scan state to `completed` or `failed`. The scan record remains in `executing` with no active process to advance it.
**Solution:**
Connect to the database using the `prowler_admin` user. Due to Row-Level Security (RLS), the default database user cannot see scan records — you must use `prowler_admin`:
```bash
psql -U prowler_admin -d prowler_db
```
Identify the stuck scan by filtering for scans in `executing` state:
```sql
SELECT id, name, state, started_at FROM scans WHERE state = 'executing';
```
Update the scan state to `failed` using the scan ID:
```sql
UPDATE scans SET state = 'failed' WHERE id = '<scan-id>';
```
After this change, the scan will appear as failed in the UI and you can launch a new scan.
<Note>
A feature to cancel executing scans directly from the UI is being tracked in [GitHub Issue #6893](https://github.com/prowler-cloud/prowler/issues/6893).
</Note>
### SAML/OAuth ACS URL Incorrect When Running Behind a Proxy or Load Balancer
See [GitHub Issue #9724](https://github.com/prowler-cloud/prowler/issues/9724) for more details.
@@ -1,47 +0,0 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
@@ -1,51 +0,0 @@
---
title: 'Prowler Check Kreator'
---
<Note>
Currently, this tool is only available for creating checks for the AWS provider.
</Note>
<Note>
If you are looking for a way to create new checks for all the supported providers, you can use [Prowler Studio](https://github.com/prowler-cloud/prowler-studio), it is an AI-powered toolkit for generating and managing security checks for Prowler (better version of the Check Kreator).
</Note>
## Introduction
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
<Note>
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
</Note>
<Warning>
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
</Warning>
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).
+12 -2
View File
@@ -14,9 +14,8 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🔄 Changed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Azure Network Watcher flow log checks now require workspace-backed Traffic Analytics for `network_flow_log_captured_sent` and align metadata with VNet-compatible flow log guidance [(#10645)](https://github.com/prowler-cloud/prowler/pull/10645)
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs
- Azure compliance entries for legacy Network Watcher flow log controls now use retirement-aware guidance and point new deployments to VNet flow logs [(#10937)](https://github.com/prowler-cloud/prowler/pull/10937)
- AWS CodeBuild service now batches `BatchGetProjects` and `BatchGetBuilds` calls per region (up to 100 items per call) to reduce API call volume and prevent throttling-induced false positives in `codebuild_project_not_publicly_accessible` [(#10639)](https://github.com/prowler-cloud/prowler/pull/10639)
- `display_compliance_table` dispatch switched from substring `in` checks to `startswith` to prevent false matches between similarly named frameworks (e.g. `cisa` vs `cis`) [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
@@ -32,6 +31,17 @@ All notable changes to the **Prowler SDK** are documented in this file.
---
## [5.25.2] (Prowler v5.25.2)
### 🐞 Fixed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
- Match K8s RBAC rules by `apiGroup` [(#10969)](https://github.com/prowler-cloud/prowler/pull/10969)
- Return a compact actor name from CloudTrail `userIdentity` events [(#10986)](https://github.com/prowler-cloud/prowler/pull/10986)
---
## [5.25.1] (Prowler v5.25.1)
### 🐞 Fixed
@@ -221,27 +221,12 @@ class CloudTrailTimeline(TimelineService):
@staticmethod
def _extract_actor(user_identity: Dict[str, Any]) -> str:
"""Extract a human-readable actor name from CloudTrail userIdentity."""
# Try ARN first - most reliable
"""Return a compact actor name from CloudTrail userIdentity.
For ARNs, returns the resource portion (everything after the last
`:`) — e.g. `user/alice`, `assumed-role/MyRole/session-name`,
`root`. The full ARN is preserved separately in `actor_uid`.
"""
if arn := user_identity.get("arn"):
if "/" in arn:
parts = arn.split("/")
# For assumed-role, return the role name (second-to-last part)
if "assumed-role" in arn and len(parts) >= 2:
return parts[-2]
return parts[-1]
return arn.split(":")[-1]
# Fall back to userName
if username := user_identity.get("userName"):
return username
# Fall back to principalId
if principal_id := user_identity.get("principalId"):
return principal_id
# For service-invoked actions
if invoking_service := user_identity.get("invokedBy"):
return invoking_service
return "Unknown"
return arn.rsplit(":", 1)[-1]
return user_identity.get("invokedBy") or "Unknown"
@@ -1,36 +1,37 @@
def is_rule_allowing_permissions(rules, resources, verbs):
def is_rule_allowing_permissions(rules, resources, verbs, api_groups=("",)):
"""
Check Kubernetes role permissions.
Check whether any RBAC rule grants the specified verbs on the specified
resources within the specified API groups.
This function takes in Kubernetes role rules, resources, and verbs,
and checks if any of the rules grant permissions on the specified
resources with the specified verbs.
A rule matches when its `apiGroups` includes any of `api_groups` (or "*"),
its `resources` includes any of `resources` (or "*"), and its `verbs`
includes any of `verbs` (or "*").
Args:
rules (List[Rule]): The list of Kubernetes role rules.
resources (List[str]): The list of resources to check permissions for.
verbs (List[str]): The list of verbs to check permissions for.
rules (List[Rule]): RBAC rules from a Role or ClusterRole.
resources (List[str]): Resources (or sub-resources) to check.
verbs (List[str]): Verbs to check.
api_groups (Iterable[str]): API groups the resources live in. Defaults
to ("",), the core API group, which matches the most common case.
Pass an explicit value for resources outside the core group, e.g.
("admissionregistration.k8s.io",) for webhook configurations.
Returns:
bool: True if any of the rules grant permissions, False otherwise.
bool: True if any rule grants the permission, False otherwise.
"""
if rules:
# Iterate through each rule in the list of rules
for rule in rules:
# Ensure apiGroups are relevant ("" or "v1" for secrets)
if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups):
continue # Skip rules with unrelated apiGroups
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
if (
rule.resources
and (
any(resource in rule.resources for resource in resources)
or "*" in rule.resources
)
and rule.verbs
and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs)
):
# If the rule matches, return True
return True
# If no rule matches, return False
if not rules:
return False
for rule in rules:
rule_api_groups = rule.apiGroups or [""]
if not (
any(g in rule_api_groups for g in api_groups) or "*" in rule_api_groups
):
continue
if (
rule.resources
and (any(r in rule.resources for r in resources) or "*" in rule.resources)
and rule.verbs
and (any(v in rule.verbs for v in verbs) or "*" in rule.verbs)
):
return True
return False
@@ -6,29 +6,40 @@ from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["update", "patch"]
resources = ["certificatesigningrequests/approval"]
api_groups = ["certificates.k8s.io"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings
@@ -11,21 +11,32 @@ resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings
@@ -9,29 +9,40 @@ resources = [
"mutatingwebhookconfigurations",
]
verbs = ["create", "update", "delete"]
api_groups = ["admissionregistration.k8s.io"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings
@@ -100,7 +100,7 @@ class TestCloudTrailTimeline:
assert len(result) == 1
assert result[0]["event_name"] == "RunInstances"
assert result[0]["actor"] == "admin"
assert result[0]["actor"] == "user/admin"
assert result[0]["source_ip_address"] == "203.0.113.1"
def test_get_resource_timeline_with_resource_uid(
@@ -304,14 +304,28 @@ class TestExtractActor:
"arn": "arn:aws:iam::123456789012:user/alice",
"userName": "alice",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "alice"
assert CloudTrailTimeline._extract_actor(user_identity) == "user/alice"
def test_extract_actor_assumed_role(self):
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/MyRole/session-name",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "MyRole"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/MyRole/session-name"
)
def test_extract_actor_assumed_role_sso(self):
"""SSO sessions store the user identity in the session name."""
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com",
}
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com"
)
def test_extract_actor_root(self):
user_identity = {"type": "Root", "arn": "arn:aws:iam::123456789012:root"}
@@ -327,21 +341,33 @@ class TestExtractActor:
== "elasticloadbalancing.amazonaws.com"
)
def test_extract_actor_fallback_to_principal_id(self):
user_identity = {"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
assert (
CloudTrailTimeline._extract_actor(user_identity) == "AROAEXAMPLEID:session"
)
def test_extract_actor_unknown(self):
assert CloudTrailTimeline._extract_actor({}) == "Unknown"
def test_extract_actor_username_only_returns_unknown(self):
"""When userIdentity carries only userName/principalId (no arn or
invokedBy), we deliberately return "Unknown" we rely on the ARN
from the upstream service for the actor."""
assert (
CloudTrailTimeline._extract_actor({"type": "IAMUser", "userName": "alice"})
== "Unknown"
)
assert (
CloudTrailTimeline._extract_actor(
{"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
)
== "Unknown"
)
def test_extract_actor_federated_user(self):
user_identity = {
"type": "FederatedUser",
"arn": "arn:aws:sts::123456789012:federated-user/developer",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "developer"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "federated-user/developer"
)
class TestParseEvent:
@@ -380,7 +406,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["event_source"] == "ec2.amazonaws.com"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
assert result["actor_uid"] == "arn:aws:iam::123456789012:user/admin"
assert result["actor_type"] == "IAMUser"
@@ -424,7 +450,10 @@ class TestParseEvent:
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": {
"userIdentity": {"type": "IAMUser", "userName": "admin"},
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
},
},
}
timeline = CloudTrailTimeline(session=mock_session)
@@ -432,7 +461,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
def test_parse_event_missing_event_id(self, mock_session):
"""Test parsing event without EventId returns None (event_id is required)."""
@@ -506,7 +535,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
# actor_type should be None when not present in userIdentity
assert result["actor_type"] is None
@@ -6,90 +6,92 @@ from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permissions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
Rule(resources=["pods", "services"], verbs=["get", "list"]),
# Rule 2: Allows 'create' and 'delete' on 'deployments'
Rule(resources=["deployments"], verbs=["create", "delete"]),
]
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permissions(rules, resources, verbs)
assert is_rule_allowing_permissions(
rules, ["pods", "deployments"], ["get", "create"]
)
def test_no_permissions(self):
# Test when there are no rules
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
rules = [
Rule(resources=["services"], verbs=["get", "list"]),
Rule(resources=["pods"], verbs=["create", "delete"]),
]
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions(
rules, ["deployments", "configmaps"], ["get", "create"]
)
def test_empty_rules(self):
# Test when the rules list is empty
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], [])
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], ["get"])
assert not is_rule_allowing_permissions(rules, ["pods"], [])
def test_rule_with_non_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["apps"])]
assert not is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_default_api_group_is_core(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_empty_api_groups_does_not_match_non_core_request(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert not is_rule_allowing_permissions(
rules, ["pods"], ["get"], ["admissionregistration.k8s.io"]
)
def test_non_core_rule_does_not_match_without_api_groups_argument(self):
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = []
verbs = ["get"]
assert not is_rule_allowing_permissions(
rules, ["validatingwebhookconfigurations"], ["create"]
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_ignored_api_groups(self):
# Test when a rule has apiGroups that are not relevant
def test_explicit_non_core_api_group(self):
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = ["pods"]
verbs = ["get"]
assert is_rule_allowing_permissions(
rules,
["validatingwebhookconfigurations"],
["create"],
["admissionregistration.k8s.io"],
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["*"])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, ["pods"], ["get"], ["apps"])
def test_rule_with_relevant_api_groups(self):
# Test when a rule has apiGroups that are relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
def test_rule_with_wildcard_resources(self):
rules = [Rule(resources=["*"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_verbs(self):
rules = [Rule(resources=["pods"], verbs=["*"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
+1 -1
View File
@@ -2,7 +2,7 @@
All notable changes to the **Prowler UI** are documented in this file.
## [1.25.2] (Prowler UNRELEASED)
## [1.25.2] (Prowler v5.25.2)
### 🔄 Changed